In [205]:
#r "nuget: Microsoft.Azure.Storage.Common"
#r "nuget: Microsoft.Azure.Storage.Blob"
#r "nuget: Microsoft.Azure.Storage.File"
#r "nuget: Newtonsoft.Json"
#r "nuget: System.Data.SqlClient"

StatementMeta(DefaultPool, 28, 205, Finished, Available)

In [206]:
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using System.IO;
using Newtonsoft.Json;
using System.Data.SqlClient;
using System.Data;


StatementMeta(DefaultPool, 28, 206, Finished, Available)

In [207]:
public class SerengetiData
{
    [JsonProperty("info")]
    public Info Info { get; set; }

    [JsonProperty("categories")]
    public List<Category> Categories { get; set; }

    [JsonProperty("images")]
    public List<Image> Images { get; set; }

    [JsonProperty("annotations")]
    public List<Annotation> Annotations { get; set; }
}

public class Annotation
{
    [JsonProperty("sequence_level_annotation")]
    public bool SequenceLevelAnnotation { get; set; }

    [JsonProperty("id")]
    public string Id { get; set; }

    [JsonProperty("category_id")]
    public long CategoryId { get; set; }

    [JsonProperty("seq_id")]
    public string SeqId { get; set; }

    [JsonProperty("season")]
    public string Season { get; set; }

    [JsonProperty("datetime")]
    public DateTimeOffset Datetime { get; set; }

    [JsonProperty("subject_id")]
    public string SubjectId { get; set; }

    [JsonProperty("count")]
    public object Count { get; set; }

    [JsonProperty("standing")]
    public object Standing { get; set; }

    [JsonProperty("resting")]
    public object Resting { get; set; }

    [JsonProperty("moving")]
    public object Moving { get; set; }

    [JsonProperty("interacting")]
    public object Interacting { get; set; }

    [JsonProperty("young_present")]
    public object YoungPresent { get; set; }

    [JsonProperty("image_id")]
    public string ImageId { get; set; }

    [JsonProperty("location")]
    public string Location { get; set; }
}

public class Category
{
    [JsonProperty("id")]
    public long Id { get; set; }

    [JsonProperty("name")]
    public string Name { get; set; }
}

public class Image
{
    [JsonProperty("id")]
    public string Id { get; set; }

    [JsonProperty("file_name")]
    public string FileName { get; set; }

    [JsonProperty("frame_num")]
    public long FrameNum { get; set; }

    [JsonProperty("seq_id")]
    public string SeqId { get; set; }

    [JsonProperty("width")]
    public long Width { get; set; }

    [JsonProperty("height")]
    public long Height { get; set; }

    [JsonProperty("corrupt")]
    public bool Corrupt { get; set; }

    [JsonProperty("location")]
    public string Location { get; set; }

    [JsonProperty("seq_num_frames")]
    public long SeqNumFrames { get; set; }

    [JsonProperty("datetime")]
    public DateTimeOffset Datetime { get; set; }
}

public class Info
{
    [JsonProperty("version")]
    public string Version { get; set; }

    [JsonProperty("description")]
    public string Description { get; set; }

    [JsonProperty("date_created")]
    public long DateCreated { get; set; }

    [JsonProperty("contributor")]
    public string Contributor { get; set; }
}

StatementMeta(DefaultPool, 28, 207, Finished, Available)

In [208]:
CloudBlobContainer blobContainer;
CloudBlobDirectory blobDirectory;
SqlConnection sqlConn;

var dbConnectionString = "";
var storageConnectionString = "";

var sqlCon = new SqlConnection(dbConnectionString);

sqlCon.OpenAsync();

private void InitStorageAndDb()
{
    // Create a FileEndpoint for the destination ADLS
    CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageConnectionString);

    var blobClient= storageAccount.CreateCloudBlobClient();
    blobContainer =  blobClient.GetContainerReference("snapshot-serengeti");
    blobDirectory = blobContainer.GetDirectoryReference("metadata");
    sqlConn = new SqlConnection(dbConnectionString);
}

StatementMeta(DefaultPool, 28, 208, Finished, Available)

In [209]:
private async Task<T> ReadJsonFile<T> (CloudBlockBlob jsonBlob)
{
    using (var memoryStream = new MemoryStream())
    {
        // Download the JSON file to a memory stream
        await jsonBlob.DownloadToStreamAsync(memoryStream);

        // Reset the memory stream position
        memoryStream.Position = 0;

        // Use a JsonTextReader to read the JSON file in chunks
        using (var jsonTextReader = new JsonTextReader(new StreamReader(memoryStream)) { CloseInput = false })
        {
            // Use a JsonSerializer to deserialize the JSON file
            var jsonSerializer = new JsonSerializer();

            // Read the JSON file in chunks and deserialize it
            return jsonSerializer.Deserialize<T>(jsonTextReader);
        }
    }
}


StatementMeta(DefaultPool, 28, 209, Finished, Available)

In [210]:
private async Task CreateTablesAsync()
{
    var commands = new Dictionary<string, string>()
    {
        {"images", "CREATE TABLE images (id VARCHAR(255), file_name VARCHAR(255), frame_num INT, seq_id VARCHAR(255), width INT, height INT, corrupt BIT, location VARCHAR(255), seq_num_frames INT, datetime DATETIME);"},
        {"categories", "CREATE TABLE categories (id INT, name VARCHAR(255));"},
        {"annotations", "CREATE TABLE annotations ( id VARCHAR(255) NOT NULL, category_id INT NOT NULL, seq_id VARCHAR(255) NOT NULL, season VARCHAR(255) NOT NULL, datetime DATETIME NOT NULL, image_id VARCHAR(255) NOT NULL, location VARCHAR(255) NOT NULL );"},
        {"train", "CREATE TABLE train ( location VARCHAR(255));"},
        {"val", "CREATE TABLE val ( location VARCHAR(255));"}
    };


    await sqlConn.OpenAsync();

    foreach(var command in commands)
    {
        using(SqlCommand sqlCmd =new SqlCommand(command.Value, sqlConn))
        {
            try
            {
                await sqlCmd.ExecuteNonQueryAsync();
                Console.WriteLine($"Table {command.Key} created successfully.");
            }
            catch(Exception ex)
            {
                Console.WriteLine($"Error creating table {command.Key}: " + ex.Message);
            }
        }   
    }

    await sqlConn.CloseAsync();
}

StatementMeta(DefaultPool, 28, 210, Finished, Available)

In [211]:
private async Task BulkInsertImages(List<Image> images)
{
    await sqlConn.OpenAsync();

    using(var bulkCpy = new SqlBulkCopy(sqlConn))
    {
        bulkCpy.DestinationTableName="images";

        var dataTable = new DataTable();
        dataTable.Columns.Add("id", typeof(string));
        dataTable.Columns.Add("file_name", typeof(string));
        dataTable.Columns.Add("frame_num", typeof(long));
        dataTable.Columns.Add("seq_id", typeof(string));
        dataTable.Columns.Add("width", typeof(long));
        dataTable.Columns.Add("height", typeof(long));
        dataTable.Columns.Add("corrupt", typeof(bool));
        dataTable.Columns.Add("location", typeof(string));
        dataTable.Columns.Add("seq_num_frames", typeof(long));
        dataTable.Columns.Add("datetime", typeof(DateTime));

        foreach (var image in images)
        {
            var row = dataTable.NewRow();
            row["id"] = image.Id;
            row["file_name"] = image.FileName;
            row["frame_num"] = image.FrameNum;
            row["seq_id"] = image.SeqId;
            row["width"] = image.Width;
            row["height"] = image.Height;
            row["corrupt"] = image.Corrupt;
            row["location"] = image.Location;
            row["seq_num_frames"] = image.SeqNumFrames;
            row["datetime"] = image.Datetime.DateTime;

            dataTable.Rows.Add(row);
        }

        await Task.Run(() => bulkCpy.WriteToServer(dataTable));
    }

    await sqlConn.CloseAsync();
}


StatementMeta(DefaultPool, 28, 211, Finished, Available)

In [212]:
private async Task BulkInsertCategories(List<Category> categories)
{
    await sqlConn.OpenAsync();

    using (var bulkCpy = new SqlBulkCopy(sqlConn))
    {
        bulkCpy.DestinationTableName = "categories";

        var dataTable = new DataTable();
        dataTable.Columns.Add("id", typeof(long));
        dataTable.Columns.Add("name", typeof(string));

        foreach(var category in categories)
        {
            var row = dataTable.NewRow();
            row["id"] = category.Id;
            row["name"] = category.Name;
        }

        await Task.Run(() => bulkCpy.WriteToServer(dataTable));
    }

    await sqlConn.CloseAsync();
}


StatementMeta(DefaultPool, 28, 212, Finished, Available)

In [213]:
private async Task BulkInsertAnnotations(List<Annotation> annotations)
{
    await sqlConn.OpenAsync();

    using (var bulkCpy = new SqlBulkCopy(sqlConn))
    {
        bulkCpy.DestinationTableName = "annotations";

        var dataTable = new DataTable();
        dataTable.Columns.Add("id", typeof(string));
        dataTable.Columns.Add("category_id", typeof(long));
        dataTable.Columns.Add("seq_id", typeof(string));
        dataTable.Columns.Add("season", typeof(string));
        dataTable.Columns.Add("datetime", typeof(DateTime));
        dataTable.Columns.Add("image_id", typeof(string));
        dataTable.Columns.Add("location", typeof(string));

        foreach (var annotation in annotations)
        {
            var row = dataTable.NewRow();
            row["id"] = annotation.Id;
            row["category_id"] = annotation.CategoryId;
            row["seq_id"] = annotation.SeqId;
            row["season"] = annotation.Season;
            row["datetime"] = annotation.Datetime.LocalDateTime;
            row["image_id"] = annotation.ImageId;
            row["location"] = annotation.Location;
            dataTable.Rows.Add(row);
        }

        await bulkCpy.WriteToServerAsync(dataTable);
    }
    await sqlConn.CloseAsync();
}


StatementMeta(DefaultPool, 28, 213, Finished, Available)

In [214]:
private async Task BulkInsertSplits(dynamic splits)
{
    await sqlConn.OpenAsync();

    using (var bulkCpy = new SqlBulkCopy(sqlConn))
    {
        bulkCpy.DestinationTableName = "train";

        var dataTable = new DataTable();
        dataTable.Columns.Add("location", typeof(string));

        foreach (var loc in splitData.splits.train)
        {
            var row = dataTable.NewRow();
            row["location"] = loc;
            dataTable.Rows.Add(row);
        }

        await bulkCpy.WriteToServerAsync(dataTable);
    }

    using (var bulkCpy = new SqlBulkCopy(sqlConn))
    {
        bulkCpy.DestinationTableName = "val";

        var dataTable = new DataTable();
        dataTable.Columns.Add("location", typeof(string));

        foreach (var loc in splitData.splits.val)
        {
            var row = dataTable.NewRow();
            row["location"] = loc;
            dataTable.Rows.Add(row);
        }

        await bulkCpy.WriteToServerAsync(dataTable);
    }
}

StatementMeta(DefaultPool, 28, 214, Finished, Available)

In [215]:
InitStorageAndDb();
await CreateTablesAsync();

//season files
var seasonFiles = new List<string>()
{
    "SnapshotSerengetiS01.json",
    "SnapshotSerengetiS02.json",
    "SnapshotSerengetiS03.json",
    "SnapshotSerengetiS04.json",
    "SnapshotSerengetiS05.json",
    "SnapshotSerengetiS06.json",
    "SnapshotSerengetiS07.json",
    "SnapshotSerengetiS08.json",
    "SnapshotSerengetiS09.json",
    "SnapshotSerengetiS10.json",
    "SnapshotSerengetiS11.json"
};

foreach (var file in seasonFiles)
{
    var blob = blobDirectory.GetBlockBlobReference(file);
    var serengetiData=await ReadJsonFile<SerengetiData>(blob);

    if(seasonFiles.IndexOf(file) == 0)
        await BulkInsertCategories(serengetiData.Categories);

    await BulkInsertImages(serengetiData.Images);
    await BulkInsertAnnotations(serengetiData.Annotations);
}

var blob = blobDirectory.GetBlockBlobReference("SnapshotSerengetiSplits_v0.json");
var splitData = await ReadJsonFile<dynamic>(blob);

await BulkInsertSplits(splitData);

StatementMeta(DefaultPool, 28, 215, Finished, Available)

Table images created successfully.
Table categories created successfully.
Table annotations created successfully.
Table train created successfully.
Table val created successfully.
