Skip to content

feat: Neo4j CSV to RDF Converter #7545

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions contrib/neo4j-converter/Neo4jCSVToRDFConverter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package main

import (
"bufio"
"bytes"
"encoding/csv"
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"time"
)

var (
inputPath = flag.String("input", "", "Please provide the input csv file.")
outputPath = flag.String("output", "", "Where to place the output?")
)

func main() {
flag.Parse()
//check input path length
if len(*inputPath) == 0 {
log.Fatal("Please set the input argument.")
}
//check output path length
if len(*outputPath) == 0 {
log.Fatal("Please set the output argument.")
}
fmt.Printf("CSV to convert: %q ?[y/n]", *inputPath)

var inputConf, outputConf string
check2(fmt.Scanf("%s", &inputConf))

fmt.Printf("Output directory wanted: %q ?[y/n]", *outputPath)
check2(fmt.Scanf("%s", &outputConf))

if inputConf != "y" || outputConf != "y" {
fmt.Println("Please update the directories")
return
}

//open the file
ifile, err := os.Open(*inputPath)
check(err)
defer ifile.Close()
//log the start time
ts := time.Now().UnixNano()

//create output file in append mode
outputName := filepath.Join(*outputPath, fmt.Sprintf("converted_%d.rdf", ts))
oFile, err := os.OpenFile(outputName, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
check(err)
defer oFile.Close()
//process the file
check(processNeo4jCSV(ifile, oFile))
fmt.Printf("Finished writing %q", outputName)

}

func processNeo4jCSV(r io.Reader, w io.Writer) error {

scanner := bufio.NewScanner(r)
scanner.Split(bufio.ScanLines)
var text, rdfLines bytes.Buffer

header := make(map[int]string)
positionOfStart, startPositionOfProperty := -1, -1

//read header
readHeader := func() {
h := csv.NewReader(strings.NewReader(scanner.Text()))
line, _ := h.Read()
//read headers
for position, fieldName := range line {
header[position] = fieldName

if fieldName == "_start" {
positionOfStart = position
} else if fieldName == "_type" {
startPositionOfProperty = position + 1
}
}
}

// Scan and read the header.
scanner.Scan()
readHeader()
//ensure that header exists
if positionOfStart == -1 {
return errors.New("column '_start' is absent in file")
}

// Read the actual data.
for scanner.Scan() {
//parse csv
text.WriteString(scanner.Text() + "\n")
d := csv.NewReader(strings.NewReader(text.String()))
records, err := d.ReadAll()
check(err)

linkStartNode := ""
linkEndNode := ""
linkName := ""
facets := make(map[string]string)

line := records[0]
for position := 0; position < len(line); position++ {

// This is an _id node.
if len(line[0]) > 0 {
bn := fmt.Sprintf("<_:k_%s>", line[0])
if position < positionOfStart && position > 0 {
//write non-facet data
rdfLines.WriteString(fmt.Sprintf("%s <%s> \"%s\" .\n",
bn, header[position], line[position]))
}
continue
}
// Handle relationship data.
if position >= positionOfStart {
if header[position] == "_start" {
linkStartNode = fmt.Sprintf("<_:k_%s>", line[position])
} else if header[position] == "_end" {
linkEndNode = fmt.Sprintf("<_:k_%s>", line[position])
} else if header[position] == "_type" {
linkName = fmt.Sprintf("<%s>", line[position])
} else if position >= startPositionOfProperty {
//collect facets
facets[header[position]] = line[position]
}
continue
}
}
//write the facets
if len(linkName) > 0 {
facetLine := ""
atleastOneFacetExists := false
for facetName, facetValue := range facets {
if len(facetValue) == 0 {
continue
}
//strip [ ], and assume only one value
facetValue = strings.Replace(facetValue, "[", "", 1)
facetValue = strings.Replace(facetValue, "]", "", 1)
if atleastOneFacetExists {
//insert a comma to separate multiple facets
facetLine = fmt.Sprintf("%s, ", facetLine)
}
//write the actual facet
facetLine = fmt.Sprintf("%s %s=%s", facetLine, facetName, facetValue)
atleastOneFacetExists = true
}
if atleastOneFacetExists {
//wrap all facets with round brackets
facetLine = fmt.Sprintf("( %s )", facetLine)
}
rdfLines.WriteString(fmt.Sprintf("%s %s %s %s .\n",
linkStartNode, linkName, linkEndNode, facetLine))
}

text.Reset()
//write a chunk when ready
if rdfLines.Len() > 100<<20 {
// Flush the writes and reset the rdfLines
check2(w.Write(rdfLines.Bytes()))
rdfLines.Reset()
}
}
check2(w.Write(rdfLines.Bytes()))
return nil
}
func check2(_ interface{}, err error) {
if err != nil {
log.Fatal(err)
}
}
func check(err error) {
if err != nil {
log.Fatal(err)
}
}
75 changes: 75 additions & 0 deletions contrib/neo4j-converter/Neo4jConverter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"bytes"
"fmt"
"github.com/sergi/go-diff/diffmatchpatch"
"github.com/stretchr/testify/require"
"io/ioutil"
"strings"
"testing"
)

func TestParsingHeader(t *testing.T) {
i := strings.NewReader("my request")
buf := new(bytes.Buffer)
require.Error(t, processNeo4jCSV(i, buf), "column '_start' is absent in file")
}

func TestSingleLineFileString(t *testing.T) {
header := `"_id","_labels","born","name","released","tagline"` +
`,"title","_start","_end","_type","roles"`
detail := `"188",":Movie","","","1999","Welcome to the Real World","The Matrix",,,,`
fileLines := fmt.Sprintf("%s\n%s", header, detail)
output := `<_:k_188> <_labels> ":Movie" .
<_:k_188> <born> "" .
<_:k_188> <name> "" .
<_:k_188> <released> "1999" .
<_:k_188> <tagline> "Welcome to the Real World" .
<_:k_188> <title> "The Matrix" .
`
i := strings.NewReader(fileLines)
buf := new(bytes.Buffer)
processNeo4jCSV(i, buf)
require.Equal(t, buf.String(), output)
}

func TestWholeFile(t *testing.T) {
goldenFile := "./output.rdf"
inBuf, _ := ioutil.ReadFile("./example.csv")
i := strings.NewReader(string(inBuf))
buf := new(bytes.Buffer)
processNeo4jCSV(i, buf)
//check id
require.Contains(t, buf.String(), "<_:k_188> <_labels> \":Movie\" .")
//check facets
require.Contains(t, buf.String(),
"<_:k_191> <ACTED_IN> <_:k_188> ( roles=\"Morpheus\" )")
//check link w/o facets
require.Contains(t, buf.String(), "<_:k_193> <DIRECTED> <_:k_188>")

//check full file
expected, err := ioutil.ReadFile(goldenFile)
if err != nil {
// Handle error
}
isSame := bytes.Equal(expected, buf.Bytes())
if !isSame {
fmt.Println("Printing comparison")
dmp := diffmatchpatch.New()
diffs := dmp.DiffMain(string(expected), buf.String(), true)
fmt.Println(dmp.DiffPrettyText(diffs))
}
require.True(t, isSame)

}

func BenchmarkSampleFile(b *testing.B) {
inBuf, _ := ioutil.ReadFile("./example.csv")
i := strings.NewReader(string(inBuf))
buf := new(bytes.Buffer)
for k := 0; k < b.N; k++ {
processNeo4jCSV(i, buf)
buf.Reset()
}
}
16 changes: 16 additions & 0 deletions contrib/neo4j-converter/example.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"_id","_labels","born","name","released","tagline","title","_start","_end","_type","roles"
"188",":Movie","","","1999","Welcome to the Real World","The Matrix",,,,
"189",":Person","1964","Keanu Reeves","","","",,,,
"190",":Person","1967","Carrie-Anne Moss","","","",,,,
"191",":Person","1961","Laurence Fishburne","","","",,,,
"192",":Person","1960","Hugo Weaving","","","",,,,
"193",":Person","1967","Lilly Wachowski","","","",,,,
"194",":Person","1965","Lana Wachowski","","","",,,,
"195",":Person","1952","Joel Silver","","","",,,,
,,,,,,,"189","188","ACTED_IN","[""Neo""]"
,,,,,,,"190","188","ACTED_IN","[""Trinity""]"
,,,,,,,"191","188","ACTED_IN","[""Morpheus""]"
,,,,,,,"192","188","ACTED_IN","[""Agent Smith""]"
,,,,,,,"193","188","DIRECTED",""
,,,,,,,"194","188","DIRECTED",""
,,,,,,,"195","188","PRODUCED",""
55 changes: 55 additions & 0 deletions contrib/neo4j-converter/output.rdf
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<_:k_188> <_labels> ":Movie" .
<_:k_188> <born> "" .
<_:k_188> <name> "" .
<_:k_188> <released> "1999" .
<_:k_188> <tagline> "Welcome to the Real World" .
<_:k_188> <title> "The Matrix" .
<_:k_189> <_labels> ":Person" .
<_:k_189> <born> "1964" .
<_:k_189> <name> "Keanu Reeves" .
<_:k_189> <released> "" .
<_:k_189> <tagline> "" .
<_:k_189> <title> "" .
<_:k_190> <_labels> ":Person" .
<_:k_190> <born> "1967" .
<_:k_190> <name> "Carrie-Anne Moss" .
<_:k_190> <released> "" .
<_:k_190> <tagline> "" .
<_:k_190> <title> "" .
<_:k_191> <_labels> ":Person" .
<_:k_191> <born> "1961" .
<_:k_191> <name> "Laurence Fishburne" .
<_:k_191> <released> "" .
<_:k_191> <tagline> "" .
<_:k_191> <title> "" .
<_:k_192> <_labels> ":Person" .
<_:k_192> <born> "1960" .
<_:k_192> <name> "Hugo Weaving" .
<_:k_192> <released> "" .
<_:k_192> <tagline> "" .
<_:k_192> <title> "" .
<_:k_193> <_labels> ":Person" .
<_:k_193> <born> "1967" .
<_:k_193> <name> "Lilly Wachowski" .
<_:k_193> <released> "" .
<_:k_193> <tagline> "" .
<_:k_193> <title> "" .
<_:k_194> <_labels> ":Person" .
<_:k_194> <born> "1965" .
<_:k_194> <name> "Lana Wachowski" .
<_:k_194> <released> "" .
<_:k_194> <tagline> "" .
<_:k_194> <title> "" .
<_:k_195> <_labels> ":Person" .
<_:k_195> <born> "1952" .
<_:k_195> <name> "Joel Silver" .
<_:k_195> <released> "" .
<_:k_195> <tagline> "" .
<_:k_195> <title> "" .
<_:k_189> <ACTED_IN> <_:k_188> ( roles="Neo" ) .
<_:k_190> <ACTED_IN> <_:k_188> ( roles="Trinity" ) .
<_:k_191> <ACTED_IN> <_:k_188> ( roles="Morpheus" ) .
<_:k_192> <ACTED_IN> <_:k_188> ( roles="Agent Smith" ) .
<_:k_193> <DIRECTED> <_:k_188> .
<_:k_194> <DIRECTED> <_:k_188> .
<_:k_195> <PRODUCED> <_:k_188> .
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
github.com/prometheus/client_golang v0.9.3
github.com/prometheus/common v0.4.1 // indirect
github.com/prometheus/procfs v0.0.0-20190517135640-51af30a78b0e // indirect
github.com/sergi/go-diff v1.1.0
github.com/soheilhy/cmux v0.1.4
github.com/spf13/cast v1.3.0
github.com/spf13/cobra v0.0.5
Expand Down