Setup necessary modules

In [26]:
%use dataframe
%use kandy

Setup file columns and folders

In [27]:
import java.nio.file.Paths
import java.nio.file.Files
import java.io.BufferedInputStream
import org.apache.commons.csv.CSVFormat
import kotlin.io.path.inputStream

val date by column<Int>("DATE")
val count by column<Int>("count")
val tavg by column<Double>("TAVG")
val tmin by column<Double>("TMIN")
val tmax by column<Double>("TMAX")
val average by column<Double>()
val max by column<Double>()
val min by column<Double>()

val name by column<String>("NAME")
val latitude by column<Double>("LATITUDE")
val longitude by column<Double>("LONGITUDE")
val elevation by column<Double>("ELEVATION")
val snow by column<Double>("SNOW")

//hacky
val projectFolder = Paths.get(System.getProperty("user.dir").replace("/src/main/kotlin/org/data/first/","")).normalize()
println(projectFolder)
val rawDataFolder = projectFolder.resolve("raw-data")
println(rawDataFolder)
val rawDataYearlyDataFolder = rawDataFolder.resolve("yearly-weather")
println(rawDataYearlyDataFolder)
val refinedDataFolder = projectFolder.resolve("refined-data")
println(refinedDataFolder)
val refinedDataYearlyDataFolder = refinedDataFolder.resolve("yearly-data")
refinedDataYearlyDataFolder.toFile().mkdirs()
println(refinedDataYearlyDataFolder)
val presentableDataFolder = projectFolder.resolve("presentable-data")
presentableDataFolder.toFile().mkdirs()
println(presentableDataFolder)

val highElevationStart = 2000.0

/Users/jansrnicek/IdeaProjects/article.1
/Users/jansrnicek/IdeaProjects/article.1/raw-data
/Users/jansrnicek/IdeaProjects/article.1/raw-data/yearly-weather
/Users/jansrnicek/IdeaProjects/article.1/refined-data
/Users/jansrnicek/IdeaProjects/article.1/refined-data/yearly-data
/Users/jansrnicek/IdeaProjects/article.1/presentable-data


Count all raw data samples

In [29]:

Files.list(rawDataYearlyDataFolder)
    .map { Files.readAllLines(it).count() }
   .toList()
    .sum()

2658574

Refine data to only the ones that have minimum,maximum and average temparatures available

In [30]:

var counter = 0
Files.list(rawDataYearlyDataFolder)
    .toList()   
    .chunked(1000)
    .forEach { chunk ->
        val outFile = refinedDataYearlyDataFolder.resolve("yearly_data_part_$counter.csv")
        Files.writeString(outFile, chunk.map {
            DataFrame.read(it.toFile())
                .filter { it.containsKey("TAVG") }
                .filter { it.get("TAVG") != null }
                .filter { !tavg.getValue(it).isNaN() }
                .filter { it.containsKey("TMIN") }
                .filter { it.get("TMIN") != null }
                .filter { !tmin.getValue(it).isNaN() }
                .filter { it.containsKey("TMAX") }
                .filter { it.get("TMAX") != null }
                .filter { !tmax.getValue(it).isNaN() }
        }
            .toList()
            .concat()
            .toCsv(CSVFormat.DEFAULT))
        counter += 1
    }


Count data samples after refinement

In [31]:

Files.list(refinedDataYearlyDataFolder)
    .map { Files.readAllLines(it).count() }
    .toList()
    .sum()

868331

Aggregate all files to a single file with count of data samples, min,max and average temparatures

In [32]:


Files.writeString(presentableDataFolder.resolve("basic-temparatures.csv"), Files.list(refinedDataYearlyDataFolder)
    .toList()
    .chunked(5)
    .map { chunk ->
        chunk.map { DataFrame.readCSV(BufferedInputStream(it.inputStream())) }
            .toList()
            .concat()
            .groupBy(date)
            .sortBy(date)
            .aggregate {
                count() into count
                mean(tavg) into average
                mean(tmin) into min
                mean(tmax) into max
            }
    }
    .toList()
    .concat()
    .convert(count).with{it.toInt()}
    .groupBy(date)
    .sortBy(date)
    .aggregate {
        sum(count) into count
        mean(average) into average
        mean(min) into min
        mean(max) into max
    }
    .toCsv())

/Users/jansrnicek/IdeaProjects/article.1/presentable-data/basic-temparatures.csv

Plot min/max temparatures

In [33]:

DataFrame.readCSV(BufferedInputStream(presentableDataFolder.resolve("basic-temparatures.csv").toFile().inputStream()))
    .plot {
        line {
            x(date) {
                axis {
                    name = "Year"
                    breaks(format = "{d}")
                }
            }
            y(min) {
                scale = continuous(0.0..20.0)
                axis {
                    name = "Temperatures"
                }
            }
            color(min) {
                scale = continuous(Color.BLUE..Color.RED)
                legend {
                    name = "Temparature in Celsius"
                }
            }
        }
        line {
            x(date)
            y(max) { scale = continuous(0.0..20.0) }
            color(max) {
                scale = continuous(Color.BLUE..Color.RED)
                legend {
                    name = "Temparature in Celsius"
                }
            }
        }

        layout.size = 1500 to 800
    }

Plot weather stations

In [34]:

DataFrame.readCSV(BufferedInputStream(presentableDataFolder.resolve("basic-temparatures.csv").toFile().inputStream()))
    .plot {
        line {
            x(date) {
                axis {
                    name = "Year"
                    breaks(format = "{d}")
                }
            }
            y(count) {
                axis {
                    name = "Weather stations"
                }
            }
            color(count) {
                scale = continuous(Color.RED..Color.GREEN)
                legend {
                    name = "Weather stations"
                }
            }
        }

        layout.size = 1500 to 800
    }

Put data for all stations into GPX format so we can feed it int omap

In [35]:
data class GpxEntry(val name: String, val latitude: Double, val longitude: Double) {
    override fun toString(): String {
        return """<wpt lat="${latitude}" lon="${longitude}">
		            <name>${name}</name>
	              </wpt>"""
    }
}

data class GpxTemplate(val points: List<GpxEntry>) {
    override fun toString(): String {
        return """<?xml version="1.0" encoding="UTF-8"?>
                    <gpx version="1.1" xmlns="http://www.topografix.com/GPX/1/1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.topografix.com/GPX/1/1 http://www.topografix.com/GPX/1/1/gpx.xsd">
	                    ${points.map { it.toString() }.joinToString("\n")}
                    </gpx>"""
    }
}

In [36]:


val points = Files.list(refinedDataYearlyDataFolder)
    .toList()
    .chunked(5)
    .map { chunk ->
        chunk.map { DataFrame.readCSV(BufferedInputStream(it.inputStream())) }
            .toList()
            .concat()
            .groupBy(name, latitude, longitude)
            .count()
    }
    .toList()
    .concat()
    .groupBy(name, latitude, longitude)
    .count()
    .map {
        GpxEntry(
            name = it.getValue(name), latitude = it.getValue(latitude), longitude = it.getValue(longitude)
        )
    }
    .toList()

Files.writeString(presentableDataFolder.resolve("gpx-visualization.gpx"),GpxTemplate(points).toString())


/Users/jansrnicek/IdeaProjects/article.1/presentable-data/gpx-visualization.gpx

In [37]:
Files.writeString(presentableDataFolder.resolve("duplicate-values.csv"),Files.list(refinedDataYearlyDataFolder)
    .toList()
    .chunked(5)
    .map { chunk ->
        chunk.map {
            DataFrame.readCSV(BufferedInputStream(it.inputStream()))
                .filter { date.getValue(it) >= 1900 }
        }
            .toList()
            .concat()
            .groupBy(name,date)
            .aggregate {
                count() into count
            }
    }
    .toList()
    .concat()
    .convert(count).with { it.toInt() }
    .groupBy(name,date)
    .aggregate {
        sum(count) into count
    }
    .filter { count.getValue(it) > 1 }//duplicates
    .sortByDesc(date)
    .toCsv())

/Users/jansrnicek/IdeaProjects/article.1/presentable-data/duplicate-values.csv

In [38]:
DataFrame.readCSV(BufferedInputStream(presentableDataFolder.resolve("duplicate-values.csv").inputStream()))
    .take(10)

NAME,DATE,count
SAVANNAH HILTON HEAD INTERNATIONAL AI...,2023,2
"MARYBOROUGH, AS",2023,2
"TERRE HAUTE HULMAN REGIONAL AIRPORT, ...",2023,2
"APALACHICOLA AIRPORT, FL US",2023,2
"VICTORIA REGIONAL AIRPORT, TX US",2023,2
"SAN JOSE, CA US",2023,2
"CORNER BROOK, NL CA",2023,2
"YORKTON, SK CA",2023,2
"BAIE COMEAU, QC CA",2023,2
"TROIS RIVIERES, QC CA",2023,2


In [39]:
val limitYear = 1980
val expectedValueCount = 2023 - limitYear + 1
val error = 1 //one year missing
val solidDataFileName = "solid-data-since-$limitYear.csv"

In [40]:

val potentialCandidates = Files.list(refinedDataYearlyDataFolder)
    .toList()
    .chunked(5)
    .map { chunk ->
        chunk.map {
            DataFrame.readCSV(BufferedInputStream(it.inputStream()))
                .filter { date.getValue(it) >= limitYear }
        }
            .toList()
            .concat()
            .groupBy(name)
            .aggregate {
                count() into count
            }
    }
    .toList()
    .concat()
    .convert(count).with { it.toInt() }
    .groupBy(name)
    .aggregate {
        sum(count) into count
    }
    .filter { count.getValue(it) >= (expectedValueCount - error) }//duplicates
    .sortByDesc(count)
    .select(name)
    .convert(name).with { it.toString() }
    .map { name.getValue(it) }

val weatherStations = Files.list(refinedDataYearlyDataFolder)
    .toList()
    .chunked(5)
    .map { chunk ->
        chunk.map {
            DataFrame.readCSV(BufferedInputStream(it.inputStream()))
                .filter { potentialCandidates.contains(name.getValue(it)) }
                .filter { date.getValue(it) >= (expectedValueCount - error) }
        }
            .toList()
            .concat()
    }
    .toList()
    .concat()
    .distinctBy(name, date)
    .groupBy(name)
    .sortBy(name, date)
    .aggregate {
        count() into count
    }
    .convert(count).with { it.toInt() }
    .filter { count.getValue(it) >= (expectedValueCount - error) }
    .select(name)
    .convert(name).with { it.toString() }
    .map { name.getValue(it) }


Files.writeString(presentableDataFolder.resolve(solidDataFileName),
    Files.list(refinedDataYearlyDataFolder)
        .toList()
        .chunked(5)
        .map { chunk ->
            chunk.map {
                DataFrame.readCSV(BufferedInputStream(it.inputStream()))
                    .filter { weatherStations.contains(name.getValue(it)) }
                    .filter { date.getValue(it) >= limitYear }
            }
                .toList()
                .concat()
        }
        .toList()
        .concat()
        .toCsv())

/Users/jansrnicek/IdeaProjects/article.1/presentable-data/solid-data-since-1980.csv

In [41]:
val presentableFileName = "yearly-temparatures-since-$limitYear.csv"

Files.writeString(presentableDataFolder.resolve("yearly-temparatures-since-$limitYear.csv"), DataFrame.readCSV(
    BufferedInputStream(
        presentableDataFolder.resolve(solidDataFileName).inputStream()
    )
)
    .groupBy(date)
    .aggregate {
        mean(tmin) into min
        mean(tmax) into max
        mean(tavg) into average
    }
    .toCsv())

/Users/jansrnicek/IdeaProjects/article.1/presentable-data/yearly-temparatures-since-1980.csv

Plot solid data since 1900

In [42]:
DataFrame.readCSV(BufferedInputStream(presentableDataFolder.resolve("yearly-temparatures-since-$limitYear.csv").toFile().inputStream()))
    .plot {
        line {
            x(date) {
                axis {
                    name = "Year"
                    breaks(format = "{d}")
                }
            }
            y(min) {
                scale = continuous(0.0..20.0)
                axis {
                    name = "Temperatures"
                }
            }
            color(min) {
                scale = continuous(Color.BLUE..Color.RED)
                legend {
                    name = "Temparature in Celsius"
                }
            }
        }
        line {
            x(date)
            y(max) { scale = continuous(0.0..20.0) }
            color(max) {
                scale = continuous(Color.BLUE..Color.RED)
                legend {
                    name = "Temparature in Celsius"
                }
            }
        }

        layout.size = 1500 to 800
    }

In [43]:
import org.jetbrains.letsPlot.Geom
import org.jetbrains.letsPlot.annotations.layerLabels
import org.jetbrains.letsPlot.scale.guideLegend

val group by column<Int>()
val groupName by column<String>()

DataFrame.readCSV(BufferedInputStream(presentableDataFolder.resolve(solidDataFileName).inputStream()))
    .add(group) { elevation.getValue(it) / 500 }
    .convert(group).to<Int>()
    .add(groupName) {
        val groupValue = group.getValue(it)
        val left = "${groupValue * 500}"
        val right = "${((groupValue + 1) * 500) - 1}"
        "$left - $right"
    }
    .groupBy(group,groupName,date)
    .aggregate {
        count() into count
    }
    .sortBy(group)
    .groupBy(groupName)
    .aggregate {
        mean(count) into count
    }
    .convert(count).to<Int>()
    .plot {
        x(groupName){
            axis{
                name = "Elevation group"
                breaks(format = "{} m")
            }
        }
        bars {
            y(count){
                axis.name = "Number of stations"
            }
            
        }
        layout.size = Pair(1000,450)
    }
   

In [44]:
val group by column<Int>()
val groupName by column<String>()

Files.writeString(presentableDataFolder.resolve("elevation-groups-$limitYear"), DataFrame.readCSV(BufferedInputStream(presentableDataFolder.resolve(solidDataFileName).inputStream()))
    .add(group) { elevation.getValue(it) / 500 }
    .convert(group).to<Int>()
    .add(groupName) {
        val groupValue = group.getValue(it)
        val left = "${groupValue * 500}"
        val right = "${((groupValue + 1) * 500) - 1}"
        "$left - $right"
    }
    .groupBy(groupName,group, date)
    .aggregate {
        count() into count
        mean(tmin) into min
        mean(tmax) into max
    }
    .toCsv())
    

/Users/jansrnicek/IdeaProjects/article.1/presentable-data/elevation-groups-1980

In [45]:
DataFrame.readCSV(BufferedInputStream(presentableDataFolder.resolve("elevation-groups-$limitYear").inputStream()))
    .sortBy(group)
    .groupBy(groupName)
    .filter { 
        groupName.getValue(it) != "2500 - 2999" &&groupName.getValue(it) != "3000 - 3499"  
    }
    .plot {
        line {
            x(date) {
                axis.name = "Year"
            }
            y(min) {
                axis.name = "Minimum Temparature"
            }
            color(groupName) {
                legend {
                    name = "Elevation group"
                }
            }
        }
        layout.size = Pair(1000, 450)
    }


In [46]:
DataFrame.readCSV(BufferedInputStream(presentableDataFolder.resolve("elevation-groups-$limitYear").inputStream()))
    .sortBy(group)
    .groupBy(groupName)
    .filter {
        groupName.getValue(it) != "2500 - 2999" &&groupName.getValue(it) != "3000 - 3499"
    }
    .plot {
    line {
        x(date) {
            axis.name = "Year"
        }
        y(max) {
            axis.name = "Maximum Temparature"
        }
        color(groupName) {
            legend {
                name = "Elevation group"
            }
        }
    }
    layout.size = Pair(1000, 450)
}

In [47]:
val snowFallData =
    DataFrame.readCSV(BufferedInputStream(presentableDataFolder.resolve(solidDataFileName).inputStream()))
        .filter { it.get("SNOW") != null }
        .filter { !snow.getValue(it).isNaN() }
        .add(group) { elevation.getValue(it) / 500 }
        .convert(group).to<Int>()
        .add(groupName) {
            val groupValue = group.getValue(it)
            val left = "${groupValue * 500}"
            val right = "${((groupValue + 1) * 500) - 1}"
            "$left - $right"
        }
        .groupBy(groupName, group, date)
        .aggregate {
            count() into count
            mean(snow) into snow
        }
        .sortBy(group)
        .groupBy(groupName)
        .filter {
            groupName.getValue(it) != "2500 - 2999" && groupName.getValue(it) != "3000 - 3499"
        }

In [48]:
snowFallData
    .plot {
        line {
            x(date) {
                axis.name = "Year"
            }
            y(snow) {
                axis.name = "Average snowfall"
                scale = continuous(0.0..3000.0)
            }
            color(groupName) {
                legend {
                    name = "Average snowfall"
                }
            }
        }
        layout.size = Pair(1000, 450)
    }

In [49]:
val prcp by column<Double>("PRCP")

Files.writeString(presentableDataFolder.resolve("precipitation-since-$limitYear.csv"),DataFrame.readCSV(BufferedInputStream(presentableDataFolder.resolve(solidDataFileName).inputStream()))
    .filter { it.get("PRCP")!=null }
    .add(group) { elevation.getValue(it) / 500 }
    .convert(group).to<Int>()
    .add(groupName) {
        val groupValue = group.getValue(it)
        val left = "${groupValue * 500}"
        val right = "${((groupValue + 1) * 500) - 1}"
        "$left - $right"
    }
    .groupBy(groupName, group, date)
    .aggregate {
        mean(prcp) into prcp
    }.toCsv())
    

/Users/jansrnicek/IdeaProjects/article.1/presentable-data/precipitation-since-1980.csv

In [50]:
DataFrame.readCSV(BufferedInputStream(presentableDataFolder.resolve("precipitation-since-$limitYear.csv").inputStream()))
    .plot {
        line {
            x(date) {
                axis.name = "Year"
            }
            y(prcp) {
                axis.name = "Average rain"
                scale = continuous(0.0..3000.0)
            }
            color(groupName) {
                legend {
                    name = "Average rain"
                }
            }
        }
        layout.size = Pair(1000, 450)
    }
    