diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 9860e0c..098e50d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,12 +14,12 @@ jobs: # Note the fetch-depth: 0 option is required for the change log to # work correctly with goreleaser. fetch-depth: 0 - - uses: actions/setup-go@v2 + - uses: actions/setup-go@v3 with: - go-version: "^1.15.6" - - uses: golangci/golangci-lint-action@v2.5.2 + go-version: "^1.18" + - uses: golangci/golangci-lint-action@v3.1.0 with: - version: v1.35 + version: v1.45.2 - name: Unit testing run: go test ./... - uses: goreleaser/goreleaser-action@v2 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4a2614f..475ad95 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,16 +11,16 @@ jobs: runs-on: ubuntu-18.04 steps: - uses: actions/checkout@v2 - - uses: actions/setup-go@v2 + - uses: actions/setup-go@v3 with: - go-version: "^1.15.6" - - uses: golangci/golangci-lint-action@v2.5.2 + go-version: "^1.18" + - uses: golangci/golangci-lint-action@v3.1.0 with: - version: v1.35 + version: v1.45.2 - name: Unit testing run: go test ./... - name: Validate Generated Files run: | go install github.com/mailru/easyjson/easyjson@v0.7.7 go generate ./... - git diff --quiet || exit 1 \ No newline at end of file + git diff --quiet || exit 1 diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..fdc82b1 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,5 @@ +issues: + exclude-rules: + - text: "unknown JSON option \"intern\"" + linters: + - staticcheck \ No newline at end of file diff --git a/.goreleaser.yml b/.goreleaser.yml index 5864745..7f2e4ec 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -6,6 +6,7 @@ builds: - linux goarch: - amd64 + - arm64 archives: - files: @@ -15,6 +16,7 @@ release: github: owner: coralproject name: coral-importer + prerelease: auto checksum: name_template: "checksums.txt" diff --git a/README.md b/README.md index 8764aa8..2258fbf 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,116 @@ to download a release of the `coral-importer` tool. ## Strategies + +### Legacy + +This import strategy is designed to migrate data from Coral ^4.12.0 to +^6.17.1. + +This strategy requires you to stop any Coral instances that are +interacting with the MongoDB database that Coral uses. The first step to +this process is to dump the files from the remote MongoDB database to +your local machine to perform the migration: + +```bash +#!/bin/bash + +# If this script errors, stop. +set -e + +# Set this to the TALK_MONGO_URL used by Coral. +export TALK_MONGO_URL="..." + +# Set this to a folder where we'll export the documents from your ^4 database. +export CORAL_INPUT_DIRECTORY="$PWD/coral/input" + +# Set this to a folder where we'll export the documents to be uploaded to your +# ^6 database. +export CORAL_OUTPUT_DIRECTORY="$PWD/coral/output" + +# Make the directories used by this and the following tools. +mkdir -p "${CORAL_INPUT_DIRECTORY}" "${CORAL_OUTPUT_DIRECTORY}" + +# Dump each collection to the export directory. This operation can take some +# time for larger data sets. +collections=(actions assets comments settings users) +for collection in ${collections[*]} +do + mongoexport --uri "$TALK_MONGO_URL" -c $collection -o "${CORAL_INPUT_DIRECTORY}/${collection}.json" +done + +# Set this to the ID of your Tenant. If you're importing from a ^4 instance +# and do not have a Tenant ID, generate one using `uuidgen`. +export CORAL_TENANT_ID="c2440817-464e-4a8f-8851-24effd8fee9d" + +# Set this to the ID of your Site. If you're importing from a ^4 instance +# and do not have a Site ID, generate one using `uuidgen`. +export CORAL_SITE_ID="3f183f3d-205f-41da-881a-e5089057b78f" + +# This importer tool is designed to work with Coral at thw following migration +# version. This is the newest file in the +# https://github.com/coralproject/talk/tree/develop/src/core/server/services/migrate/migrations +# directory for your version of Coral. +export CORAL_MIGRATION_ID="1582929716101" + +# Set this to the file location where you want to export your log files to. +export CORAL_LOG="$PWD/coral/logs.json" + +# Run the importer tool in dry mode to perform document validation before we +# actually write any files. This may take some time and will use about 40% of +# the dataset's size in RAM to perform the validation. +coral-importer legacy --dryRun + +# If the previous command completed successfully, then you can run it for real. +coral-importer legacy + +# This should write all the output files to the `$CORAL_OUTPUT_DIRECTORY` +# directory. If you did not use SSO with your ^4 instance of Coral using +# plugins, you can continue below to the *Importing* section. Otherwise. + +# Set this to the name of a directory we can write output files that have +# been mapped. +export CORAL_MAPPER_POST_DIRECTORY="$PWD/coral/post" + +mkdir -p "${CORAL_MAPPER_POST_DIRECTORY}" + +# If your custom SSO plugin saved the User ID in the Users `profiles` array +# as `profiles.id`, like the following: +# +# { +# "profiles": [ +# { +# "provider": "my-auth", +# "id": "..." +# } +# ] +# } +# +# Then you should set the following so the mapper can grab the `profiles.id` +# from the profile and map it to a corresponding SSO profile for ^6. +# export CORAL_MAPPER_USERS_SSO_PROVIDER="my-auth" + +# If a custom plugin wrote a users username to a field other than `username`, +# such as: +# +# { +# "metadata": { +# "displayName": "My Name" +# } +# } +# +# Then you should set the following so the mapper can grab the username from +# the other field. +# export CORAL_MAPPER_USERS_USERNAME="metadata.displayName" + +# Run the importer tool in dry mode to perform document validation before we +# actually write any files. +coral-importer legacy --dryRun map + +# If the previous command completed successfully, then you can run it for real. +coral-importer legacy map +``` + ### CSV When importing vis CSV, each column must be provided, even if it is optional. In @@ -51,7 +161,7 @@ iconv -f ISO88592 -t UTF8 < users.csv > users-clean.csv #### Format -`data/csv/users.csv`: +**`data/csv/users.csv`**: | # | Column | Type | Required | Description | | --- | ---------- | ------- | -------- | -------------------------------------------------------------------------------------------------- | @@ -62,7 +172,7 @@ iconv -f ISO88592 -t UTF8 < users.csv > users-clean.csv | 4 | banned | boolean | no | Can be one of `true` or `false` (Default `false`). | | 5 | created_at | string | no | [ISO8601](http://en.wikipedia.org/wiki/ISO_8601) formatted date string (Defaults to current date). | -`data/csv/stories.csv`: +**`data/csv/stories.csv`**: | # | Column | Type | Required | Description | | --- | ------------ | ------ | -------- | -------------------------------------------------------------------------------------------------------------------------------------- | @@ -74,7 +184,7 @@ iconv -f ISO88592 -t UTF8 < users.csv > users-clean.csv | 5 | closed_at | string | No | Date as a [ISO8601](http://en.wikipedia.org/wiki/ISO_8601) formatted date string when commenting was closed (Default is unset). | | 6 | mode | string | No | Story mode, can be one of `COMMENTS`, `QA`, or `RATINGS_AND_REVIEWS` (Default `COMMENTS`) | -`data/csv/comments.csv`: +**`data/csv/comments.csv`**: | # | Column | Type | Required | Description | | --- | ---------- | ------ | -------- | -------------------------------------------------------------------------------------------------------------- | @@ -87,137 +197,37 @@ iconv -f ISO88592 -t UTF8 < users.csv > users-clean.csv | 6 | status | string | No | Status of the Comment, can be one of `APPROVED`, `REJECTED`, or `NONE` (Default's to `NONE`). | | 7 | rating | number | No | Rating attached to the Comment | -### Legacy - -```sh -DATABASE_NAME=coral -MONGO_CONTAINER_ID=mongo-export - -# Make the export directory. -mkdir -p "export" - -# Dump each collection to the export directory. This operation can take some -# time for larger data sets. -collections=(actions assets comments settings users) -for collection in ${collections[*]} -do - docker run --rm -ti --link $MONGO_CONTAINER_ID:mongo-export -v $PWD/export:/mnt/export mongo:4.2 mongoexport --host mongo-export -d $DATABASE_NAME -c $collection -o /mnt/export/${collection}.json -done - -# This now provides the export files that can be processed by the importer. -TENANT_ID=c2440817-464e-4a8f-8851-24effd8fee9d -SITE_ID=3f183f3d-205f-41da-881a-e5089057b78f -INPUT=data/legacy -OUTPUT=database - -coral-importer --quiet legacy --input $INPUT --tenantID $TENANT_ID --siteID $SITE_ID --output $OUTPUT 2> output.log -``` - -### Livefyre +## Importing ```sh -# Perform the import parsing operation. -TENANT_ID=c2440817-464e-4a8f-8851-24effd8fee9d -SITE_ID=3f183f3d-205f-41da-881a-e5089057b78f -USER_INPUT=data/livefyre/users.json -COMMENTS_INPUT=data/livefyre/comments.json -OUTPUT=database - -coral-importer --quiet livefyre --users $USER_INPUT --comments $COMMENTS_INPUT --tenantID $TENANT_ID --siteID $SITE_ID --output $OUTPUT 2> output.log -``` +# Set this to the MONGO_URL used by the new Coral ^6 instance. This should be +# different than the CORAL_MONGO_URI used by ^4. +export MONGO_URL="..." -## Importing +# Set this to a folder where we'll export the documents to be uploaded to your +# ^5 database. +export CORAL_OUTPUT_DIRECTORY="$PWD/coral/output" -```sh -# Upload the generated imports to MongoDB. -TARGET_MONGO_CONTAINER=mongo -DATABASE_NAME=coral -CONCURRENCY=$(sysctl -n hw.ncpu) +# This command should get the number of CPU's available on your machine, +# otherwise if it fails just set it to the number of CPU's manually. +export CONCURRENCY="$(sysctl -n hw.ncpu)" +# For each of these collections, import them into the new MongoDB database. collections=(commentActions stories users comments) for collection in ${collections[*]} do - if [ ! -f $PWD/$OUTPUT/$collection.json ] - then - echo "$PWD/$OUTPUT/$collection.json does not exist, not importing $collection collection" - continue - fi - - docker run --rm -ti -v $PWD:/mnt/import --link mongo:$TARGET_MONGO_CONTAINER mongo:4.2 mongoimport --uri=mongodb://mongo/$DATABASE_NAME --file=/mnt/import/$OUTPUT/$collection.json --collection $collection --numInsertionWorkers $CONCURRENCY + mongoimport --uri "$MONGO_URL" --file "${CORAL_OUTPUT_DIRECTORY}/$collection.json" --collection "$collection" --numInsertionWorkers $CONCURRENCY done ``` If you're updating documents: ```sh -collections=(commentActions stories users comments) -for collection in ${collections[*]} -do - if [ ! -f $PWD/$OUTPUT/$collection.json ] - then - echo "$PWD/$OUTPUT/$collection.json does not exist, not importing $collection collection" - continue - fi - - docker run --rm -ti -v $PWD:/mnt/import --link mongo:$TARGET_MONGO_CONTAINER mongo:4.2 mongoimport --uri=mongodb://mongo/$DATABASE_NAME --file=/mnt/import/$OUTPUT/$collection.json --collection $collection --numInsertionWorkers $CONCURRENCY --mode upsert --upsertFields id -done -``` - -## Tricks - -Print all the active operations on your database with messages. - -```js -db.currentOp() - .inprog.filter((op) => Boolean(op.msg)) - .map((op) => ({ ns: op.ns, msg: op.msg, command: op.command })); -``` -## Benchmarks - -| | | -| ------------------ | ----- | -| Running importer | 1m30s | -| Importing Comments | 4m30s | -| Importing Stories | 4s | -| Importing Users | 30s | -| Rebuilding Indexes | 20m | - -## Developing - -```sh -collections=(actions assets comments settings users) +# For each of these collections, import them into the new MongoDB database. +collections=(commentActions stories users comments) for collection in ${collections[*]} do - head ${collection}.json > ${collection}_sample.json - head -n1 ${collection}.json > ${collection}_single.json + mongoimport --uri "$MONGO_URL" --file "${CORAL_OUTPUT_DIRECTORY}/$collection.json" --collection "$collection" --numInsertionWorkers $CONCURRENCY --mode upsert --upsertFields id done -``` - -To update models run: -```sh -go generate ./... -``` - -To locally build an executable for testing use the output flag: -```sh -go build -o -``` - -## Changelog - -### v0.4.13 -- (coral) handles `metadata.displayName` values if present on legacy users - -### v0.4.3 - -- (coral) monotonic timestamps will no longer reset and cause a conflict if more - than 1000 are created in the same second. - -### v0.4.1 - -- (coral) `createdAt` timestamps that are used by Coral as cursors now are - emitted as unique timestamps when not disabled with the - `--disableMonotonicCursorTimes` flag. This means that every timestamp emitted - that shares the same second time will automatically have it's ms time - incremented to prevent collisions. +``` \ No newline at end of file diff --git a/common/cli.go b/common/cli.go deleted file mode 100644 index 06b0472..0000000 --- a/common/cli.go +++ /dev/null @@ -1,24 +0,0 @@ -package common - -import ( - "github.com/sirupsen/logrus" - "github.com/urfave/cli" -) - -// ConfigureLogger will configure the global logger based on global flags. -func ConfigureLogger(c *cli.Context) error { - quiet := c.GlobalBool("quiet") - json := c.GlobalBool("json") - - if quiet { - logrus.SetLevel(logrus.InfoLevel) - } else { - logrus.SetLevel(logrus.DebugLevel) - } - - if json { - logrus.SetFormatter(&logrus.JSONFormatter{}) - } - - return nil -} diff --git a/common/coral/commentActions.go b/common/coral/commentActions.go index 6cc8bc7..23977fa 100644 --- a/common/coral/commentActions.go +++ b/common/coral/commentActions.go @@ -6,13 +6,13 @@ import "time" // CommentAction is the base Coral Comment Action that represents an action // against a Comment. type CommentAction struct { - TenantID string `json:"tenantID" validate:"required"` + TenantID string `json:"tenantID,intern" validate:"required"` ID string `json:"id" conform:"trim" validate:"required"` - SiteID string `json:"siteID" validate:"required"` - ActionType string `json:"actionType" validate:"oneof=REACTION DONT_AGREE FLAG,required"` + SiteID string `json:"siteID,intern" validate:"required"` + ActionType string `json:"actionType,intern" validate:"oneof=REACTION DONT_AGREE FLAG,required"` CommentID string `json:"commentID" validate:"required"` CommentRevisionID string `json:"commentRevisionID" validate:"required"` - Reason string `json:"reason,omitempty" validate:"omitempty,oneof= COMMENT_DETECTED_BANNED_WORD COMMENT_DETECTED_LINKS COMMENT_DETECTED_PREMOD_USER COMMENT_DETECTED_RECENT_HISTORY COMMENT_DETECTED_REPEAT_POST COMMENT_DETECTED_SPAM COMMENT_DETECTED_SUSPECT_WORD COMMENT_DETECTED_TOXIC COMMENT_REPORTED_OFFENSIVE COMMENT_REPORTED_OTHER COMMENT_REPORTED_SPAM"` + Reason string `json:"reason,omitempty,intern" validate:"omitempty,oneof= COMMENT_DETECTED_BANNED_WORD COMMENT_DETECTED_LINKS COMMENT_DETECTED_PREMOD_USER COMMENT_DETECTED_RECENT_HISTORY COMMENT_DETECTED_REPEAT_POST COMMENT_DETECTED_SPAM COMMENT_DETECTED_SUSPECT_WORD COMMENT_DETECTED_TOXIC COMMENT_REPORTED_OFFENSIVE COMMENT_REPORTED_OTHER COMMENT_REPORTED_SPAM"` AdditionalDetails string `json:"additionalDetails,omitempty"` StoryID string `json:"storyID" validate:"required"` UserID *string `json:"userID" validate:"required"` diff --git a/common/coral/commentActions_easyjson.go b/common/coral/commentActions_easyjson.go index eff8a60..5bccfed 100644 --- a/common/coral/commentActions_easyjson.go +++ b/common/coral/commentActions_easyjson.go @@ -37,19 +37,19 @@ func easyjsonF96a437cDecodeGithubComCoralprojectCoralImporterCommonCoral(in *jle } switch key { case "tenantID": - out.TenantID = string(in.String()) + out.TenantID = string(in.StringIntern()) case "id": out.ID = string(in.String()) case "siteID": - out.SiteID = string(in.String()) + out.SiteID = string(in.StringIntern()) case "actionType": - out.ActionType = string(in.String()) + out.ActionType = string(in.StringIntern()) case "commentID": out.CommentID = string(in.String()) case "commentRevisionID": out.CommentRevisionID = string(in.String()) case "reason": - out.Reason = string(in.String()) + out.Reason = string(in.StringIntern()) case "additionalDetails": out.AdditionalDetails = string(in.String()) case "storyID": diff --git a/common/coral/comments.go b/common/coral/comments.go index 6e86edb..dae9c01 100644 --- a/common/coral/comments.go +++ b/common/coral/comments.go @@ -5,7 +5,7 @@ import "time" type RevisionPerspective struct { Score float64 `json:"score"` - Model string `json:"model"` + Model string `json:"model,intern"` } // RevisionMetadata is the metadata associated with a given Revision for a @@ -26,23 +26,23 @@ type Revision struct { // CommentTag is a Tag associated with a Comment in Coral. type CommentTag struct { - Type string `json:"type" conform:"trim" validate:"oneof=STAFF FEATURED REVIEW QUESTION,required"` + Type string `json:"type,intern" conform:"trim" validate:"oneof=STAFF FEATURED REVIEW QUESTION,required"` CreatedBy string `json:"createdBy,omitempty"` CreatedAt Time `json:"createdAt" validate:"required"` } // Comment is the base Coral Comment that is used in Coral. type Comment struct { - TenantID string `json:"tenantID" validate:"required"` + TenantID string `json:"tenantID,intern" validate:"required"` ID string `json:"id" conform:"trim" validate:"required"` - SiteID string `json:"siteID" validate:"required"` + SiteID string `json:"siteID,intern" validate:"required"` AncestorIDs []string `json:"ancestorIDs" validate:"required"` ParentID string `json:"parentID,omitempty" conform:"trim"` ParentRevisionID string `json:"parentRevisionID,omitempty" conform:"trim"` AuthorID string `json:"authorID" conform:"trim" validate:"required"` StoryID string `json:"storyID" conform:"trim" validate:"required"` Revisions []Revision `json:"revisions" validate:"required"` - Status string `json:"status" conform:"trim" validate:"oneof=NONE APPROVED REJECTED PREMOD SYSTEM_WITHHELD,required"` + Status string `json:"status,intern" conform:"trim" validate:"oneof=NONE APPROVED REJECTED PREMOD SYSTEM_WITHHELD,required"` ActionCounts map[string]int `json:"actionCounts" validate:"required"` ChildIDs []string `json:"childIDs" validate:"required"` Tags []CommentTag `json:"tags" validate:"required"` diff --git a/common/coral/comments_easyjson.go b/common/coral/comments_easyjson.go index 89d7ac0..51a4d26 100644 --- a/common/coral/comments_easyjson.go +++ b/common/coral/comments_easyjson.go @@ -39,7 +39,7 @@ func easyjsonD09abad2DecodeGithubComCoralprojectCoralImporterCommonCoral(in *jle case "score": out.Score = float64(in.Float64()) case "model": - out.Model = string(in.String()) + out.Model = string(in.StringIntern()) default: in.SkipRecursive() } @@ -331,7 +331,7 @@ func easyjsonD09abad2DecodeGithubComCoralprojectCoralImporterCommonCoral3(in *jl } switch key { case "type": - out.Type = string(in.String()) + out.Type = string(in.StringIntern()) case "createdBy": out.CreatedBy = string(in.String()) case "createdAt": @@ -413,11 +413,11 @@ func easyjsonD09abad2DecodeGithubComCoralprojectCoralImporterCommonCoral4(in *jl } switch key { case "tenantID": - out.TenantID = string(in.String()) + out.TenantID = string(in.StringIntern()) case "id": out.ID = string(in.String()) case "siteID": - out.SiteID = string(in.String()) + out.SiteID = string(in.StringIntern()) case "ancestorIDs": if in.IsNull() { in.Skip() @@ -473,7 +473,7 @@ func easyjsonD09abad2DecodeGithubComCoralprojectCoralImporterCommonCoral4(in *jl in.Delim(']') } case "status": - out.Status = string(in.String()) + out.Status = string(in.StringIntern()) case "actionCounts": if in.IsNull() { in.Skip() diff --git a/common/coral/stories.go b/common/coral/stories.go index c815f03..6b51c2e 100644 --- a/common/coral/stories.go +++ b/common/coral/stories.go @@ -81,9 +81,9 @@ type StoryMetadata struct { // Story is the base Coral Story that is used in Coral. type Story struct { - TenantID string `json:"tenantID" validate:"required"` + TenantID string `json:"tenantID,intern" validate:"required"` ID string `json:"id" conform:"trim" validate:"required"` - SiteID string `json:"siteID" validate:"required"` + SiteID string `json:"siteID,intern" validate:"required"` URL string `json:"url" validate:"required,url"` CommentCounts StoryCommentCounts `json:"commentCounts"` Settings StorySettings `json:"settings"` diff --git a/common/coral/stories_easyjson.go b/common/coral/stories_easyjson.go index 95eb274..2a8674f 100644 --- a/common/coral/stories_easyjson.go +++ b/common/coral/stories_easyjson.go @@ -399,11 +399,11 @@ func easyjsonE08b5de9DecodeGithubComCoralprojectCoralImporterCommonCoral3(in *jl } switch key { case "tenantID": - out.TenantID = string(in.String()) + out.TenantID = string(in.StringIntern()) case "id": out.ID = string(in.String()) case "siteID": - out.SiteID = string(in.String()) + out.SiteID = string(in.StringIntern()) case "url": out.URL = string(in.String()) case "commentCounts": diff --git a/common/coral/time.go b/common/coral/time.go index 42c10a8..0ead752 100644 --- a/common/coral/time.go +++ b/common/coral/time.go @@ -5,6 +5,7 @@ import ( "reflect" "time" + "github.com/coralproject/coral-importer/internal/warnings" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -37,7 +38,9 @@ func (t *Time) UnmarshalJSON(data []byte) error { // Try to handle the case where we get something that looks like // this: {"$date":{"$numberLong":"-62075098782000"}} if _, ok := obj["$numberLong"].(string); ok { - logrus.Warn("saw a date in the format: { $date: { $numberLong: \"-62075098782000\" } }") + warnings.UnsupportedDateFormat.Once(func() { + logrus.Warn("saw a date in the format: { $date: { $numberLong: \"-62075098782000\" } }") + }) return nil } @@ -45,7 +48,9 @@ func (t *Time) UnmarshalJSON(data []byte) error { // Try to handle the case where we get something that looks like // this: {"$date":{"$numberLong":-62075098782000"} if _, ok := obj["$numberLong"].(float64); ok { - logrus.Warn("saw a date in the format: { $date: { $numberLong: \"-62075098782000\" } }") + warnings.UnsupportedDateFormat.Once(func() { + logrus.Warn("saw a date in the format: { $date: { $numberLong: \"-62075098782000\" } }") + }) return nil } diff --git a/common/coral/users.go b/common/coral/users.go index 38d15ca..a001843 100644 --- a/common/coral/users.go +++ b/common/coral/users.go @@ -16,7 +16,7 @@ type UserNotifications struct { OnFeatured bool `json:"onFeatured"` OnStaffReplies bool `json:"onStaffReplies"` OnModeration bool `json:"onModeration"` - DigestFrequency string `json:"digestFrequency" validate:"oneof=NONE DAILY HOURLY"` + DigestFrequency string `json:"digestFrequency,intern" validate:"oneof=NONE DAILY HOURLY"` } func NewUserNotifications() UserNotifications { @@ -123,12 +123,12 @@ type UserCommentCounts struct { } type User struct { - TenantID string `json:"tenantID" validate:"required"` + TenantID string `json:"tenantID,intern" validate:"required"` ID string `json:"id" conform:"trim" validate:"required"` Username string `json:"username" validate:"required"` Email string `json:"email,omitempty" conform:"email,lower" validate:"email"` Profiles []UserProfile `json:"profiles,omitempty"` - Role string `json:"role" validate:"required,oneof=COMMENTER STAFF MODERATOR ADMIN"` + Role string `json:"role,intern" validate:"required,oneof=COMMENTER STAFF MODERATOR ADMIN"` Notifications UserNotifications `json:"notifications"` ModeratorNotes []string `json:"moderatorNotes"` Status UserStatus `json:"status"` diff --git a/common/coral/users_easyjson.go b/common/coral/users_easyjson.go index ac4d893..0e1f559 100644 --- a/common/coral/users_easyjson.go +++ b/common/coral/users_easyjson.go @@ -918,7 +918,7 @@ func easyjson84c0690eDecodeGithubComCoralprojectCoralImporterCommonCoral9(in *jl case "onModeration": out.OnModeration = bool(in.Bool()) case "digestFrequency": - out.DigestFrequency = string(in.String()) + out.DigestFrequency = string(in.StringIntern()) default: in.SkipRecursive() } @@ -1271,7 +1271,7 @@ func easyjson84c0690eDecodeGithubComCoralprojectCoralImporterCommonCoral13(in *j } switch key { case "tenantID": - out.TenantID = string(in.String()) + out.TenantID = string(in.StringIntern()) case "id": out.ID = string(in.String()) case "username": @@ -1302,7 +1302,7 @@ func easyjson84c0690eDecodeGithubComCoralprojectCoralImporterCommonCoral13(in *j in.Delim(']') } case "role": - out.Role = string(in.String()) + out.Role = string(in.StringIntern()) case "notifications": (out.Notifications).UnmarshalEasyJSON(in) case "moderatorNotes": diff --git a/common/pipeline/aggregating_processor.go b/common/pipeline/aggregating_processor.go deleted file mode 100644 index 6495cc1..0000000 --- a/common/pipeline/aggregating_processor.go +++ /dev/null @@ -1,56 +0,0 @@ -package pipeline - -import ( - "runtime" - - "github.com/pkg/errors" -) - -type AggregationWriter func(collection, key, value string) - -type AggregatingProcessor func(writer AggregationWriter, input *TaskReaderInput) error - -func HandleAggregatingProcessor(in <-chan TaskReaderInput, process AggregatingProcessor) <-chan TaskAggregatorOutput { - out := make(chan TaskAggregatorOutput) - - writeToOutput := func(collection, key, value string) { - out <- TaskAggregatorOutput{ - Collection: collection, - Key: key, - Value: value, - } - } - - go func() { - defer close(out) - for n := range in { - if n.Error != nil { - out <- TaskAggregatorOutput{ - Error: errors.Wrap(n.Error, "error occurred on stack"), - } - - return - } - - if err := process(writeToOutput, &n); err != nil { - out <- TaskAggregatorOutput{ - Error: errors.Wrap(err, "error occurred during processing"), - } - - return - } - } - }() - - return out -} - -// FanAggregatingProcessor will fan the processor across the number of CPU's available. -func FanAggregatingProcessor(input <-chan TaskReaderInput, process AggregatingProcessor) []<-chan TaskAggregatorOutput { - out := make([]<-chan TaskAggregatorOutput, runtime.NumCPU()) - for i := range out { - out[i] = HandleAggregatingProcessor(input, process) - } - - return out -} diff --git a/common/pipeline/aggregator.go b/common/pipeline/aggregator.go deleted file mode 100644 index c03e11c..0000000 --- a/common/pipeline/aggregator.go +++ /dev/null @@ -1,72 +0,0 @@ -package pipeline - -import ( - "sync" - - "github.com/pkg/errors" -) - -type TaskAggregatorOutput struct { - Error error - Key string - Value string - Collection string -} - -// MergeTaskAggregatorOutputPipelines will collect all results from the input channels -// and output them on a single channel. -func MergeTaskAggregatorOutputPipelines(cs []<-chan TaskAggregatorOutput) <-chan TaskAggregatorOutput { - var wg sync.WaitGroup - out := make(chan TaskAggregatorOutput) - - // Start an output goroutine for each input channel in cs. output - // copies values from c to out until c is closed, then calls wg.Done. - output := func(c <-chan TaskAggregatorOutput) { - for n := range c { - out <- n - } - wg.Done() - } - - wg.Add(len(cs)) - - for _, c := range cs { - go output(c) - } - - // Start a goroutine to close out once all the output goroutines are - // done. This must start after the wg.Add call. - go func() { - wg.Wait() - close(out) - }() - - return out -} - -func NewMapAggregator(input <-chan TaskAggregatorOutput) (map[string]map[string][]string, error) { - out := make(map[string]map[string][]string) - - for task := range input { - if task.Error != nil { - return nil, errors.Wrap(task.Error, "error occurred on stack") - } - - // Get the results map to add to the collectio results. - if _, ok := out[task.Collection]; !ok { - out[task.Collection] = make(map[string][]string) - } - - // Ensure that the specific key we're adding to already exists. - if _, ok := out[task.Collection][task.Key]; !ok { - out[task.Collection][task.Key] = []string{} - } - - // Push the value onto the map. - if task.Value != "" { - out[task.Collection][task.Key] = append(out[task.Collection][task.Key], task.Value) - } - } - - return out, nil -} diff --git a/common/pipeline/reader.go b/common/pipeline/reader.go deleted file mode 100644 index 9057592..0000000 --- a/common/pipeline/reader.go +++ /dev/null @@ -1,122 +0,0 @@ -package pipeline - -import ( - "bufio" - "encoding/csv" - "io" - "os" - - "github.com/pkg/errors" -) - -type TaskReaderInput struct { - Error error - Line int - Input string - Fields []string -} - -func NewCSVFileReader(fileName string, fieldsPerRecord int) <-chan TaskReaderInput { - out := make(chan TaskReaderInput) - go func() { - defer close(out) - - // Open that file for reading. - f, err := os.Open(fileName) - if err != nil { - out <- TaskReaderInput{ - Error: errors.Wrap(err, "could not open --input for reading"), - } - - return - } - defer f.Close() - - // Setup the reader. - r := csv.NewReader(bufio.NewReader(f)) - r.FieldsPerRecord = fieldsPerRecord - r.TrimLeadingSpace = true - - // Keep track of the processed lines. - lines := 0 - - // Start reading the stories line by line from the file. - for { - fields, err := r.Read() - if err != nil { - if errors.Is(err, io.EOF) { - break - } - - out <- TaskReaderInput{ - Error: errors.Wrap(err, "couldn't read the file"), - } - - return - } - - // Increment the line count. - lines++ - - // Send the input to a processor. - out <- TaskReaderInput{ - Line: lines, - Fields: fields, - } - } - }() - - return out -} - -// NewJSONFileReader will read from a file and emit new TaskInput's from the lines. -func NewJSONFileReader(fileName string) <-chan TaskReaderInput { - out := make(chan TaskReaderInput) - go func() { - defer close(out) - - // Open that file for reading. - f, err := os.Open(fileName) - if err != nil { - out <- TaskReaderInput{ - Error: errors.Wrap(err, "could not open --input for reading"), - } - - return - } - defer f.Close() - - // Setup the scanner. - r := bufio.NewReader(f) - - // Keep track of the processed lines. - lines := 0 - - // Start reading the stories line by line from the file. - for { - line, err := r.ReadString('\n') - if err != nil { - if errors.Is(err, io.EOF) { - break - } - - out <- TaskReaderInput{ - Error: errors.Wrap(err, "couldn't read the file"), - } - - return - } - - // Increment the line count. - lines++ - - // Send the input to a processor. - out <- TaskReaderInput{ - Line: lines, - Input: line, - } - } - }() - - return out -} diff --git a/common/pipeline/summer.go b/common/pipeline/summer.go deleted file mode 100644 index 559be70..0000000 --- a/common/pipeline/summer.go +++ /dev/null @@ -1,68 +0,0 @@ -package pipeline - -import ( - "sync" - - "github.com/pkg/errors" -) - -type TaskSummerOutput struct { - Error error - Collection string - Key string - Value int -} - -func MergeTaskSummerOutputPipelines(cs []<-chan TaskSummerOutput) <-chan TaskSummerOutput { - var wg sync.WaitGroup - out := make(chan TaskSummerOutput) - - // Start an output goroutine for each input channel in cs. output - // copies values from c to out until c is closed, then calls wg.Done. - output := func(c <-chan TaskSummerOutput) { - for n := range c { - out <- n - } - wg.Done() - } - wg.Add(len(cs)) - for _, c := range cs { - go output(c) - } - - // Start a goroutine to close out once all the output goroutines are - // done. This must start after the wg.Add call. - go func() { - wg.Wait() - close(out) - }() - - return out -} - -func NewSummer(input <-chan TaskSummerOutput) (map[string]map[string]int, error) { - out := make(map[string]map[string]int) - - for task := range input { - if task.Error != nil { - return nil, errors.Wrap(task.Error, "error occurred on stack") - } - - // Get the results map to add to the collection results. - if _, ok := out[task.Collection]; !ok { - out[task.Collection] = make(map[string]int) - } - - // Ensure that the specific key we're adding to already exists. - if _, ok := out[task.Collection][task.Key]; !ok { - out[task.Collection][task.Key] = 0 - } - - // Push the value onto the map. - if task.Value != 0 { - out[task.Collection][task.Key] += task.Value - } - } - - return out, nil -} diff --git a/common/pipeline/summer_processor.go b/common/pipeline/summer_processor.go deleted file mode 100644 index fff407c..0000000 --- a/common/pipeline/summer_processor.go +++ /dev/null @@ -1,56 +0,0 @@ -package pipeline - -import ( - "runtime" - - "github.com/pkg/errors" -) - -type SummerWriter func(collection, key string, value int) - -type SummerProcessor func(writer SummerWriter, input *TaskReaderInput) error - -func HandleSummerProcessor(in <-chan TaskReaderInput, process SummerProcessor) <-chan TaskSummerOutput { - out := make(chan TaskSummerOutput) - - writeToOutput := func(collection, key string, value int) { - out <- TaskSummerOutput{ - Collection: collection, - Key: key, - Value: value, - } - } - - go func() { - defer close(out) - for n := range in { - if n.Error != nil { - out <- TaskSummerOutput{ - Error: errors.Wrap(n.Error, "error occurred on stack"), - } - - return - } - - if err := process(writeToOutput, &n); err != nil { - out <- TaskSummerOutput{ - Error: errors.Wrap(err, "error occurred during processing"), - } - - return - } - } - }() - - return out -} - -// FanSummerProcessor will fan the processor across the number of CPU's available. -func FanSummerProcessor(input <-chan TaskReaderInput, process SummerProcessor) []<-chan TaskSummerOutput { - out := make([]<-chan TaskSummerOutput, runtime.NumCPU()) - for i := range out { - out[i] = HandleSummerProcessor(input, process) - } - - return out -} diff --git a/common/pipeline/writer.go b/common/pipeline/writer.go deleted file mode 100644 index f5987c1..0000000 --- a/common/pipeline/writer.go +++ /dev/null @@ -1,94 +0,0 @@ -package pipeline - -import ( - "bufio" - "fmt" - "os" - "path/filepath" - "sync" - - "github.com/pkg/errors" -) - -type TaskWriterOutput struct { - Error error - Collection string - Document []byte -} - -// MergeTaskWriterOutputPipelines will collect all results from the input channels -// and output them on a single channel. -func MergeTaskWriterOutputPipelines(cs []<-chan TaskWriterOutput) <-chan TaskWriterOutput { - var wg sync.WaitGroup - out := make(chan TaskWriterOutput) - - // Start an output goroutine for each input channel in cs. output - // copies values from c to out until c is closed, then calls wg.Done. - output := func(c <-chan TaskWriterOutput) { - for n := range c { - out <- n - } - wg.Done() - } - wg.Add(len(cs)) - for _, c := range cs { - go output(c) - } - - // Start a goroutine to close out once all the output goroutines are - // done. This must start after the wg.Add call. - go func() { - wg.Wait() - close(out) - }() - - return out -} - -// NewFileWriter will write the outputs out based on the output. -func NewFileWriter(folder string, input <-chan TaskWriterOutput) error { - writers := make(map[string]*bufio.Writer) - - for task := range input { - if task.Error != nil { - return errors.Wrap(task.Error, "error occurred on stack") - } - - writer, ok := writers[task.Collection] - if !ok { - // Ensure that the folder exists. - if _, err := os.Stat(folder); os.IsNotExist(err) { - if err := os.Mkdir(folder, 0o755); err != nil { - return errors.Wrap(err, "can't make output directory") - } - } - - // Create the file to write to. - f, err := os.Create(filepath.Join(folder, fmt.Sprintf("%s.json", task.Collection))) - if err != nil { - return errors.Wrap(err, "could not create file") - } - //nolint:staticcheck - defer f.Close() - - // Wrap this file in a buffered writer. - writer = bufio.NewWriter(f) - //nolint:staticcheck - defer writer.Flush() - - // Link the writer to the map of writers. - writers[task.Collection] = writer - } - - // Write the document out. - if _, err := writer.Write(task.Document); err != nil { - return errors.Wrap(err, "could not write") - } - - if _, err := writer.WriteString("\n"); err != nil { - return errors.Wrap(err, "could not write") - } - } - - return nil -} diff --git a/common/pipeline/writing_processor.go b/common/pipeline/writing_processor.go deleted file mode 100644 index d202094..0000000 --- a/common/pipeline/writing_processor.go +++ /dev/null @@ -1,68 +0,0 @@ -package pipeline - -import ( - "runtime" - - "github.com/mailru/easyjson" - "github.com/pkg/errors" -) - -// CollectionWriter will write a document (serialized) out to the collection -// import file. -type CollectionWriter func(collection string, doc easyjson.Marshaler) error - -// WritingProcessor is the actual processor that receives a line. -type WritingProcessor func(writer CollectionWriter, input *TaskReaderInput) error - -// HandleWritingProcessor will wrap a Processor around a fanning input queue and -// collect the results into an output queue. -func HandleWritingProcessor(in <-chan TaskReaderInput, process WritingProcessor) <-chan TaskWriterOutput { - out := make(chan TaskWriterOutput) - - writeToOutput := func(collection string, doc easyjson.Marshaler) error { - bytes, err := easyjson.Marshal(doc) - if err != nil { - return errors.Wrap(err, "could not marshal output") - } - - out <- TaskWriterOutput{ - Collection: collection, - Document: bytes, - } - - return nil - } - - go func() { - defer close(out) - for n := range in { - if n.Error != nil { - out <- TaskWriterOutput{ - Error: errors.Wrap(n.Error, "error occurred on stack"), - } - - return - } - - if err := process(writeToOutput, &n); err != nil { - out <- TaskWriterOutput{ - Error: errors.Wrap(err, "error occurred during processing"), - } - - return - } - } - }() - - return out -} - -// FanWritingProcessors will fan the processor across the number of CPU's available. -func FanWritingProcessors(input <-chan TaskReaderInput, process WritingProcessor) []<-chan TaskWriterOutput { - out := make([]<-chan TaskWriterOutput, runtime.NumCPU()) - for i := range out { - out[i] = HandleWritingProcessor(input, process) - } - - return out -} diff --git a/go.mod b/go.mod index 666b0ea..7439ce1 100644 --- a/go.mod +++ b/go.mod @@ -1,23 +1,39 @@ module github.com/coralproject/coral-importer -go 1.15 +go 1.18 require ( - github.com/corpix/uarand v0.1.1 // indirect - github.com/etgryphon/stringUp v0.0.0-20121020160746-31534ccd8cac // indirect - github.com/go-playground/locales v0.12.1 // indirect - github.com/go-playground/universal-translator v0.16.0 // indirect - github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428 // indirect - github.com/kr/pretty v0.1.0 // indirect - github.com/leebenson/conform v0.0.0-20190822094432-4c55492f71d7 - github.com/leodido/go-urn v1.1.0 // indirect + github.com/cheggaaa/pb/v3 v3.0.8 + github.com/fatih/color v1.13.0 + github.com/josharian/intern v1.0.0 + github.com/kelseyhightower/envconfig v1.4.0 + github.com/leebenson/conform v1.2.2 github.com/mailru/easyjson v0.7.7 - github.com/microcosm-cc/bluemonday v1.0.17 + github.com/microcosm-cc/bluemonday v1.0.18 github.com/pkg/errors v0.9.1 github.com/satori/go.uuid v1.2.0 github.com/sirupsen/logrus v1.8.1 - github.com/urfave/cli v1.22.5 + github.com/urfave/cli/v2 v2.4.0 + gopkg.in/go-playground/validator.v9 v9.31.0 +) + +require ( + github.com/VividCortex/ewma v1.2.0 // indirect + github.com/aymerick/douceur v0.2.0 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect + github.com/etgryphon/stringUp v0.0.0-20121020160746-31534ccd8cac // indirect + github.com/go-playground/locales v0.14.0 // indirect + github.com/go-playground/universal-translator v0.18.0 // indirect + github.com/gorilla/css v1.0.0 // indirect + github.com/kr/pretty v0.1.0 // indirect + github.com/leodido/go-urn v1.2.1 // indirect + github.com/mattn/go-colorable v0.1.12 // indirect + github.com/mattn/go-isatty v0.0.14 // indirect + github.com/mattn/go-runewidth v0.0.13 // indirect + github.com/rivo/uniseg v0.2.0 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect + golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect + golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/go-playground/assert.v1 v1.2.1 // indirect - gopkg.in/go-playground/validator.v9 v9.31.0 ) diff --git a/go.sum b/go.sum index d4e31b0..f8b1060 100644 --- a/go.sum +++ b/go.sum @@ -2,67 +2,97 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/Masterminds/glide v0.13.2/go.mod h1:STyF5vcenH/rUqTEv+/hBXlSTo7KYwg2oc2f4tzPWic= github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/Masterminds/vcs v1.13.0/go.mod h1:N09YCmOQr6RLxC6UNHzuVwAdodYbbnycGHSmwVJjcKA= +github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= +github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= +github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= +github.com/cheggaaa/pb/v3 v3.0.8 h1:bC8oemdChbke2FHIIGy9mn4DPJ2caZYQnfbRqwmdCoA= +github.com/cheggaaa/pb/v3 v3.0.8/go.mod h1:UICbiLec/XO6Hw6k+BHEtHeQFzzBH4i2/qk/ow1EJTA= github.com/codegangsta/cli v1.20.0/go.mod h1:/qJNoX69yVSKu5o4jLyXAENLRyk1uhi7zkbQ3slBdOA= github.com/corpix/uarand v0.1.1 h1:RMr1TWc9F4n5jiPDzFHtmaUXLKLNUFK0SgCLo4BhX/U= github.com/corpix/uarand v0.1.1/go.mod h1:SFKZvkcRoLqVRFZ4u25xPmp6m9ktANfbpXZ7SJ0/FNU= -github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= -github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU= +github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/etgryphon/stringUp v0.0.0-20121020160746-31534ccd8cac h1:YFKhR0PR8mPI+6EdPhW9BXobntXx3v3F4/1Z9xmw8t8= github.com/etgryphon/stringUp v0.0.0-20121020160746-31534ccd8cac/go.mod h1:Vd+6pUuXoxJuiYG9i6uqoew9XOpXVE9w4OovDqwM8NY= -github.com/go-playground/locales v0.12.1 h1:2FITxuFt/xuCNP1Acdhv62OzaCiviiE4kotfhkmOqEc= -github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= -github.com/go-playground/universal-translator v0.16.0 h1:X++omBR/4cE2MNg91AoC3rmGrCjJ8eAeUP/K/EKx4DM= -github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= +github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= +github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= +github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= +github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= +github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= +github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho= +github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= github.com/gorilla/css v1.0.0 h1:BQqNyPTi50JCFMTw/b67hByjMVXZRwGha6wxVGkeihY= github.com/gorilla/css v1.0.0/go.mod h1:Dn721qIggHpt4+EFCcTLTU/vk5ySda2ReITrtgBl60c= github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428 h1:Mo9W14pwbO9VfRe+ygqZ8dFbPpoIK1HFrG/zjTuQ+nc= github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428/go.mod h1:uhpZMVGznybq1itEKXj6RYw9I71qK4kH+OGMjRC4KEo= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= +github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/leebenson/conform v0.0.0-20190822094432-4c55492f71d7 h1:hDa/0r3KxsOtjVdGeK55rUhlTOYsRrxY3eozNgMVk/o= -github.com/leebenson/conform v0.0.0-20190822094432-4c55492f71d7/go.mod h1:a8mcsW8FbNiGeH6aQUuWm1Ix64JZTLYWNHOooj7Pl4I= -github.com/leodido/go-urn v1.1.0 h1:Sm1gr51B1kKyfD2BlRcLSiEkffoG96g6TPv6eRoEiB8= -github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= +github.com/leebenson/conform v1.2.2 h1:B0Sd/uYB2ZjGW/qO+KgRq06KfWFN4mlbLassI1zH1a8= +github.com/leebenson/conform v1.2.2/go.mod h1:hjD6ozSpxmgkcRsR9G4V+6N8AhSbtlsQgnuLVLTQDhk= +github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= +github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= -github.com/microcosm-cc/bluemonday v1.0.17 h1:Z1a//hgsQ4yjC+8zEkV8IWySkXnsxmdSY642CTFQb5Y= -github.com/microcosm-cc/bluemonday v1.0.17/go.mod h1:Z0r70sCuXHig8YpBzCc5eGHAap2K7e/u082ZUpDRRqM= +github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= +github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= +github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= +github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/microcosm-cc/bluemonday v1.0.18 h1:6HcxvXDAi3ARt3slx6nTesbvorIc3QeTzBNRvWktHBo= +github.com/microcosm-cc/bluemonday v1.0.18/go.mod h1:Z0r70sCuXHig8YpBzCc5eGHAap2K7e/u082ZUpDRRqM= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/ngdinhtoan/glide-cleanup v0.2.0/go.mod h1:UQzsmiDOb8YV3nOsCxK/c9zPpCZVNoHScRE3EO9pVMM= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= -github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= -github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= -github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/urfave/cli v1.22.5 h1:lNq9sAHXK2qfdI8W+GRItjCEkI+2oR4d+MEHy1CKXoU= -github.com/urfave/cli v1.22.5/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= -golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= +github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/urfave/cli/v2 v2.4.0 h1:m2pxjjDFgDxSPtO8WSdbndj17Wu2y8vOT86wE/tjr+I= +github.com/urfave/cli/v2 v2.4.0/go.mod h1:NX9W0zmTvedE5oDoOMs2RTC8RvdK98NTYZE5LbaEYPg= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220412020605-290c469a71a5 h1:bRb386wvrE+oBNdF1d/Xh9mQrfQ4ecYhW5qJ5GvTGT4= +golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= +golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -74,3 +104,6 @@ gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8 gopkg.in/go-playground/validator.v9 v9.31.0 h1:bmXmP2RSNtFES+bn4uYuHT7iJFJv7Vj+an+ZQdDaD1M= gopkg.in/go-playground/validator.v9 v9.31.0/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/utility/counter/counter.go b/internal/utility/counter/counter.go new file mode 100644 index 0000000..a037d36 --- /dev/null +++ b/internal/utility/counter/counter.go @@ -0,0 +1,30 @@ +package counter + +import ( + "github.com/cheggaaa/pb/v3" + "github.com/fatih/color" +) + +func New(title string, total int) *Counter { + color.New(color.Bold).Printf("\n%s\n", title) + + return &Counter{ + bar: pb.Full.Start(total), + } +} + +type Counter struct { + bar *pb.ProgressBar +} + +func (c *Counter) Increment() { + c.bar.Increment() +} + +func (c *Counter) Finish() { + if c.bar.Current() < c.bar.Total() { + c.bar.AddTotal(c.bar.Current() - c.bar.Total()) + } + + c.bar.Finish() +} diff --git a/internal/utility/fs.go b/internal/utility/fs.go index 4723f6a..14113a3 100644 --- a/internal/utility/fs.go +++ b/internal/utility/fs.go @@ -1,13 +1,56 @@ package utility import ( + "bytes" + "io" "os" + + "github.com/coralproject/coral-importer/internal/utility/counter" + "github.com/pkg/errors" ) -func Exists(name string) bool { - if _, err := os.Stat(name); os.IsNotExist(err) { +func Exists(fileName string) bool { + if _, err := os.Stat(fileName); os.IsNotExist(err) { return false } return true } + +func NewLineCounter(title, sourceFileName string) (*counter.Counter, error) { + lines, err := CountLines(sourceFileName) + if err != nil { + return nil, errors.Wrap(err, "could not count users file") + } + + return counter.New(title, lines), nil +} + +func CountLines(fileName string) (int, error) { + f, err := os.Open(fileName) + if err != nil { + return 0, errors.Wrap(err, "could not open file for reading") + } + defer f.Close() + + lines := 0 + // Should be the maximum size of a given lines worth of content. + buf := make([]byte, 32*1024) + lineSep := []byte{'\n'} + + for { + c, err := f.Read(buf) + lines += bytes.Count(buf[:c], lineSep) + + switch { + case err == io.EOF: + if lines == 0 && c >= 0 { + return 0, errors.New("expected file to end with a newline") + } + + return lines, nil + case err != nil: + return lines, err + } + } +} diff --git a/internal/utility/json.go b/internal/utility/json.go index 905a5df..32db51c 100644 --- a/internal/utility/json.go +++ b/internal/utility/json.go @@ -4,32 +4,45 @@ import ( "bufio" "io" "os" + "runtime" + "sync" "github.com/mailru/easyjson" "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) -func NewJSONWriter(fileName string) (*JSONWriter, error) { - f, err := os.Create(fileName) +type Writer interface { + Write(doc easyjson.Marshaler) error + Close() error +} + +type nopJSONWriter struct{} + +func (d *nopJSONWriter) Write(doc easyjson.Marshaler) error { return nil } + +func (d *nopJSONWriter) Close() error { return nil } + +func NewJSONWriter(dryRun bool, fileName string) (Writer, error) { + if dryRun { + return &nopJSONWriter{}, nil + } + + dest, err := os.Create(fileName) if err != nil { return nil, errors.Wrap(err, "could not create file for writing") } - w := bufio.NewWriter(f) + w := bufio.NewWriter(dest) return &JSONWriter{ - f: f, - w: w, - filename: fileName, + f: dest, + w: w, }, nil } type JSONWriter struct { - f *os.File - w *bufio.Writer - documents uint64 - filename string + f io.WriteCloser + w *bufio.Writer } func (c *JSONWriter) Write(doc easyjson.Marshaler) error { @@ -37,12 +50,10 @@ func (c *JSONWriter) Write(doc easyjson.Marshaler) error { return errors.Wrap(err, "could not marshal output") } - if _, err := c.w.WriteString("\n"); err != nil { + if _, err := c.w.WriteRune('\n'); err != nil { return errors.Wrap(err, "could not write newline") } - c.documents++ - return nil } @@ -55,13 +66,52 @@ func (c *JSONWriter) Close() error { return errors.Wrap(err, "could not close file") } - logrus.WithField("documents", c.documents).WithField("fileName", c.filename).Info("wrote documents") - return nil } type JSONReaderFn func(line int, data []byte) error +type Line struct { + LineNumber int + Data []byte +} + +func ReadJSONConcurrently(fileName string, fn JSONReaderFn) error { + count := runtime.NumCPU() + ch := make(chan Line, count) + var wg sync.WaitGroup + + wg.Add(count) + for i := 0; i < count; i++ { + go func() { + for line := range ch { + if err := fn(line.LineNumber, line.Data); err != nil { + panic(err) + } + } + + wg.Done() + }() + } + + if err := ReadJSON(fileName, func(line int, data []byte) error { + + ch <- Line{ + LineNumber: line, + Data: data, + } + + return nil + }); err != nil { + return err + } + + close(ch) + wg.Wait() + + return nil +} + func ReadJSON(fileName string, fn JSONReaderFn) error { f, err := os.Open(fileName) if err != nil { @@ -89,6 +139,12 @@ func ReadJSON(fileName string, fn JSONReaderFn) error { // Increment the line count. lines++ + // We're reading JSON, and if the document is less than or equal to two + // characters there is no content to read! + if len(line) <= 2 { + continue + } + // Send the input to a processor. if err := fn(lines, []byte(line)); err != nil { return errors.Wrap(err, "could not operate on the line") diff --git a/internal/warnings/registry.go b/internal/warnings/registry.go new file mode 100644 index 0000000..7af7508 --- /dev/null +++ b/internal/warnings/registry.go @@ -0,0 +1,19 @@ +package warnings + +var all []*Warning + +func Every(fn func(warning *Warning)) { + for _, warning := range all { + fn(warning) + } +} + +func register(warning *Warning) { + all = append(all, warning) +} + +func init() { + register(UnsupportedDateFormat) + register(UnsupportedUserProfileProvider) + register(SSOIDMismatch) +} diff --git a/internal/warnings/time.go b/internal/warnings/time.go new file mode 100644 index 0000000..b1dbf0b --- /dev/null +++ b/internal/warnings/time.go @@ -0,0 +1,6 @@ +package warnings + +var ( + // When an unsupported date format is encountered, this warning is emitted. + UnsupportedDateFormat = NewWarning("UnsupportedDateFormat", "a date format was encountered and was not processed") +) diff --git a/internal/warnings/users.go b/internal/warnings/users.go new file mode 100644 index 0000000..8319868 --- /dev/null +++ b/internal/warnings/users.go @@ -0,0 +1,11 @@ +package warnings + +var ( + // When a user profile is found that is not supported, this warning is + // emitted. + UnsupportedUserProfileProvider = NewWarning("UnsupportedUserProfileProvider", "a user profile provider was encountered that was not supported, and was therefore skipped") + + // Whe a user profile is found that does not have the same ID as it's id, this + // warning is emitted. + SSOIDMismatch = NewWarning("SSOIDMismatch", "a user profile was found that had a different ID from it's SSO ID") +) diff --git a/internal/warnings/warnings.go b/internal/warnings/warnings.go new file mode 100644 index 0000000..c886dac --- /dev/null +++ b/internal/warnings/warnings.go @@ -0,0 +1,69 @@ +package warnings + +import ( + "fmt" + "sync" + "sync/atomic" +) + +func NewWarning(name, description string) *Warning { + return &Warning{ + name: name, + description: description, + once: &sync.Once{}, + m: &sync.Map{}, + } +} + +type Warning struct { + name string + description string + + once *sync.Once + m *sync.Map + occurrences int32 +} + +// OnceWith will only allow the function to execute if the key has not been +// seen yet. +func (w *Warning) OnceWith(fn func(), key string) { + // Add to the number of occurrences. + atomic.AddInt32(&w.occurrences, 1) + + // Load the empty struct or store a new one. If it was loaded (and not + // stored) then we have already seen this key and it should be skipped. + if _, ok := w.m.LoadOrStore(key, struct{}{}); ok { + return + } + + fn() +} + +// Once will only allow the function to execute if it has not before. +func (w *Warning) Once(fn func()) { + // Add to the number of occurrences. + atomic.AddInt32(&w.occurrences, 1) + + w.once.Do(fn) +} + +func (w *Warning) String() string { + return fmt.Sprintf("%s: %s", w.name, w.description) +} + +func (w *Warning) Occurrences() int32 { + return atomic.LoadInt32(&w.occurrences) +} + +func (w *Warning) Keys() []string { + keys := make([]string, 0) + w.m.Range(func(key, value interface{}) bool { + if str, ok := key.(string); ok { + keys = append(keys, str) + } + + return true + }) + + return keys +} diff --git a/main.go b/main.go index bfb0973..baae9a1 100644 --- a/main.go +++ b/main.go @@ -1,17 +1,21 @@ package main import ( + "context" "fmt" + "io" "os" + "sync" + "time" - "github.com/coralproject/coral-importer/common" "github.com/coralproject/coral-importer/common/coral" + "github.com/coralproject/coral-importer/internal/warnings" "github.com/coralproject/coral-importer/strategies/csv" "github.com/coralproject/coral-importer/strategies/legacy" - "github.com/coralproject/coral-importer/strategies/livefyre" - "github.com/pkg/errors" + "github.com/coralproject/coral-importer/strategies/legacy/mapper" + "github.com/fatih/color" "github.com/sirupsen/logrus" - "github.com/urfave/cli" + "github.com/urfave/cli/v2" ) var ( @@ -28,124 +32,131 @@ const ( ) func main() { + start := time.Now() + + // Configure the writer for the logger. We'll set this in the before hook of + // the CLI. + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + var logFile io.Closer + defer func() { + if logFile == nil { + return + } + + cancel() + wg.Wait() + logFile.Close() + }() + app := cli.NewApp() app.Name = "github.com/coralproject/coral-importer" app.Usage = "imports comment exports from other providers into Coral" app.Version = fmt.Sprintf("%v, commit %v, built at %v against migration %d", version, commit, date, CurrentMigrationVersion) app.Flags = []cli.Flag{ - cli.BoolFlag{ - Name: "quiet", - Usage: "make output quieter", - }, - cli.BoolFlag{ - Name: "json", - Usage: "output logs in JSON", + &cli.Int64Flag{ + Name: "migrationID", + EnvVars: []string{"CORAL_MIGRATION_ID"}, + Usage: "ID of the most recent migration associated with your installation", }, - cli.Int64Flag{ - Name: "migrationID", - Usage: "ID of the most recent migration associated with your installation", + &cli.StringFlag{ + Name: "log", + EnvVars: []string{"CORAL_LOG"}, + Required: true, + Usage: "output directory for where the logs will be written to", }, - cli.BoolFlag{ + &cli.BoolFlag{ Name: "forceSkipMigrationCheck", Usage: "used to skip the migration version check", }, - cli.BoolFlag{ + &cli.BoolFlag{ Name: "disableMonotonicCursorTimes", Usage: "used to disable monotonic cursor times which adds a offset to the same times to ensure all emitted times are unique", }, + &cli.DurationFlag{ + Name: "memoryStatFrequency", + Usage: "specify the frequency of measurements of memory usage, default is never", + }, } app.Before = func(c *cli.Context) error { // Configure the logger. - if err := common.ConfigureLogger(c); err != nil { - return errors.Wrap(err, "could not configure logger") + logrus.SetLevel(logrus.DebugLevel) + logrus.SetFormatter(&logrus.JSONFormatter{}) + + f, err := os.OpenFile(c.String("log"), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + return cli.Exit(err.Error(), 1) + } + logFile = f + + logrus.SetOutput(f) + + memoryStatFrequency := c.Duration("memoryStatFrequency") + if memoryStatFrequency > 0 { + wg.Add(1) + go func() { + defer wg.Done() + + StartLoggingMemoryStats(ctx, memoryStatFrequency) + }() } // Check that the imported needs updating. - if c.GlobalBool("forceSkipMigrationCheck") { + if c.Bool("forceSkipMigrationCheck") { logrus.Warn("skipping migration check") - } else if c.GlobalInt64("migrationID") != CurrentMigrationVersion { + } else if c.Int64("migrationID") != CurrentMigrationVersion { logrus.WithFields(logrus.Fields{ - "migrationID": c.GlobalInt("migrationID"), + "migrationID": c.Int("migrationID"), "currentMigrationVersion": CurrentMigrationVersion, }).Fatal("migration version mismatch, update importer to support new migrations or skip with --forceSkipMigrationCheck") } // Add support for the monotonic cursor times if not disabled. - if c.GlobalBool("disableMonotonicCursorTimes") { + if c.Bool("disableMonotonicCursorTimes") { logrus.Warn("monotonic cursor times are disabled, some entries may have duplicate cursor times") } else { logrus.Info("monotonic cursor times are enabled, cursor times will be offset automatically") coral.EnableMonotonicCursorTime() } + color.New(color.Bold).Printf("coral-importer (%s)\n", c.App.Version) + return nil } - app.Commands = []cli.Command{ + app.Commands = []*cli.Command{ { Name: "csv", Usage: "a migrator designed to migrate data from the standardized CSV format", Action: csv.CLI, Flags: []cli.Flag{ - cli.StringFlag{ + &cli.StringFlag{ Name: "tenantID", Usage: "ID of the Tenant to import for", Required: true, }, - cli.StringFlag{ + &cli.StringFlag{ Name: "siteID", Usage: "ID of the Site to import for", Required: true, }, - cli.StringFlag{ + &cli.StringFlag{ Name: "auth", Usage: "type of profile to emit (One of \"sso\" or \"local\")", Value: "sso", }, - cli.StringFlag{ + &cli.StringFlag{ Name: "input", Usage: "folder where the CSV input files are located", Required: true, }, - cli.StringFlag{ + &cli.StringFlag{ Name: "output", Usage: "folder where the outputted mongo files should be placed", Required: true, }, - }, - }, - { - Name: "livefyre", - Usage: "a migrator designed to migrate data from the LiveFyre platform", - Action: livefyre.CLI, - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "tenantID", - Usage: "ID of the Tenant to import for", - Required: true, - }, - cli.StringFlag{ - Name: "siteID", - Usage: "ID of the Site to import for", - Required: true, - }, - cli.BoolFlag{ - Name: "sso", - Usage: "when true, enables adding the SSO profile to generated users with the ID of the User", - }, - cli.StringFlag{ - Name: "comments", - Usage: "newline separated JSON input file containing comments", - Required: true, - }, - cli.StringFlag{ - Name: "users", - Usage: "newline separated JSON input file containing users", - Required: true, - }, - cli.StringFlag{ - Name: "output", - Usage: "folder where the outputted mongo files should be placed", - Required: true, + &cli.BoolFlag{ + Name: "dryRun", + Usage: "processes data to validate inputs without actually writing files", }, }, }, @@ -153,37 +164,89 @@ func main() { Name: "legacy", Usage: "a migrator designed to import data from previous versions of Coral", Action: legacy.CLI, + Subcommands: []*cli.Command{ + { + Name: "map", + Usage: "perform mapping of legacy fields into importable files", + Action: mapper.CLI, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "post", + EnvVars: []string{"CORAL_MAPPER_POST_DIRECTORY"}, + Usage: "directory to write files that have been processed by the mapper", + Required: true, + }, + }, + }, + }, Flags: []cli.Flag{ - cli.StringFlag{ + &cli.StringFlag{ Name: "tenantID", + EnvVars: []string{"CORAL_TENANT_ID"}, Usage: "ID of the Tenant to import for", Required: true, }, - cli.StringFlag{ + &cli.StringFlag{ Name: "siteID", + EnvVars: []string{"CORAL_SITE_ID"}, Usage: "ID of the Site to import for", Required: true, }, - cli.StringFlag{ + &cli.StringFlag{ Name: "preferredPerspectiveModel", Usage: "the preferred model to use for copying over toxicity scores", Value: "SEVERE_TOXICITY", }, - cli.StringFlag{ + &cli.StringFlag{ Name: "input", + EnvVars: []string{"CORAL_INPUT_DIRECTORY"}, Usage: "folder where the output from mongoexport is located, separated into collection named JSON files", Required: true, }, - cli.StringFlag{ + &cli.StringFlag{ Name: "output", + EnvVars: []string{"CORAL_OUTPUT_DIRECTORY"}, Usage: "folder where the outputted mongo files should be placed", Required: true, }, + &cli.BoolFlag{ + Name: "dryRun", + Usage: "processes data to validate inputs without actually writing files", + }, }, }, } if err := app.Run(os.Args); err != nil { + color.New(color.Bold, color.FgRed).Println(err.Error()) logrus.WithError(err).Fatal() } + + warnings.Every(func(warning *warnings.Warning) { + occurrences := warning.Occurrences() + if occurrences == 0 { + return + } + + logrus.WithFields(logrus.Fields{ + "warning": warning.String(), + "occurrences": occurrences, + "keys": warning.Keys(), + }).Warn("warning occurred") + }) + + profiles := warnings.UnsupportedUserProfileProvider.Keys() + if len(profiles) > 1 { + logrus.WithFields(logrus.Fields{ + "profiles": profiles, + }).Warn("multiple forign user profiles found, multiple passes of mapper required") + } else if len(profiles) == 1 { + logrus.WithFields(logrus.Fields{ + "profiles": profiles, + }).Warn("forign user profile found, mapper required") + } + + took := time.Since(start) + color.New(color.Bold, color.FgGreen).Printf("\nCompleted, took %s\n", took) + logrus.WithField("took", took).Info("completed") } diff --git a/memory.go b/memory.go new file mode 100644 index 0000000..54cb66f --- /dev/null +++ b/memory.go @@ -0,0 +1,42 @@ +package main + +import ( + "context" + "runtime" + "time" + + "github.com/sirupsen/logrus" +) + +func bToMb(b uint64) uint64 { + return b / 1024 / 1024 +} + +func LogMemoryStats(run int64) { + var m runtime.MemStats + runtime.ReadMemStats(&m) + + logrus.WithFields(logrus.Fields{ + "run": run, + "alloc": bToMb(m.Alloc), + "totalAlloc": bToMb(m.TotalAlloc), + "sys": bToMb(m.Sys), + "numGC": m.NumGC, + }).Debug("memory stats") +} + +func StartLoggingMemoryStats(ctx context.Context, frequency time.Duration) { + run := time.Now().Unix() + + LogMemoryStats(run) + + for { + select { + case <-time.After(frequency): + LogMemoryStats(run) + case <-ctx.Done(): + LogMemoryStats(run) + return + } + } +} diff --git a/strategies/csv/csv.go b/strategies/csv/csv.go index b83f32c..c2af8e6 100644 --- a/strategies/csv/csv.go +++ b/strategies/csv/csv.go @@ -11,12 +11,12 @@ import ( "github.com/coralproject/coral-importer/common/coral" "github.com/coralproject/coral-importer/internal/utility" "github.com/coralproject/coral-importer/strategies" - "github.com/urfave/cli" "github.com/microcosm-cc/bluemonday" "github.com/pkg/errors" uuid "github.com/satori/go.uuid" "github.com/sirupsen/logrus" + "github.com/urfave/cli/v2" ) type CommentReference struct { @@ -54,6 +54,10 @@ func Import(c strategies.Context) error { // from the MongoDB export. input := c.String("input") + // dryRun indicates that the strategy should not write files and is used for + // validation. + dryRun := c.Bool("dryRun") + // auth is the identifier for the type of authentication profiles to be // created for the users. auth := c.String("auth") @@ -181,6 +185,10 @@ func Import(c strategies.Context) error { // Reconstruct all the family relationships from the parentID map. reconstructor := common.NewReconstructor() for commentID, comment := range comments { + if comment.ParentID == "" { + continue + } + reconstructor.AddIDs(commentID, comment.ParentID) } @@ -189,7 +197,7 @@ func Import(c strategies.Context) error { startedCommentsAt := time.Now() logrus.Debug("processing comments") - commentsWriter, err := utility.NewJSONWriter(commentsOutputFileName) + commentsWriter, err := utility.NewJSONWriter(dryRun, commentsOutputFileName) if err != nil { return errors.Wrap(err, "could not create comment writer") } @@ -296,7 +304,7 @@ func Import(c strategies.Context) error { startedUsersAt := time.Now() logrus.Debug("processing users") - usersWriter, err := utility.NewJSONWriter(usersOutputFileName) + usersWriter, err := utility.NewJSONWriter(dryRun, usersOutputFileName) if err != nil { return errors.Wrap(err, "could not create users writer") } @@ -414,7 +422,7 @@ func Import(c strategies.Context) error { startedStoriesAt := time.Now() logrus.Debug("processing stories") - storiesWriter, err := utility.NewJSONWriter(storiesOutputFileName) + storiesWriter, err := utility.NewJSONWriter(dryRun, storiesOutputFileName) if err != nil { return errors.Wrap(err, "could not create story writer") } diff --git a/strategies/csv/mapping.go b/strategies/csv/mapping.go index 52aad50..bf7e71b 100644 --- a/strategies/csv/mapping.go +++ b/strategies/csv/mapping.go @@ -36,7 +36,7 @@ type Comment struct { CreatedAt string `conform:"trim" validate:"required"` Body string `conform:"trim" validate:"required_without=Rating"` ParentID string `conform:"trim"` - Status string `conform:"trim" validate:"omitempty,oneof= APPROVED REJECTED NONE"` + Status string `conform:"trim,intern" validate:"omitempty,oneof= APPROVED REJECTED NONE"` Rating int } @@ -81,7 +81,7 @@ type Story struct { Author string `conform:"trim"` PublishedAt string `conform:"trim"` ClosedAt string `conform:"trim"` - Mode string `conform:"trim,upper" validate:"omitempty,oneof= COMMENTS QA RATINGS_AND_REVIEWS"` + Mode string `conform:"trim,upper,intern" validate:"omitempty,oneof= COMMENTS QA RATINGS_AND_REVIEWS"` } // ParseStory will extract a Story from the fields and perform validation on the diff --git a/strategies/legacy/context.go b/strategies/legacy/context.go index 908d638..9295424 100644 --- a/strategies/legacy/context.go +++ b/strategies/legacy/context.go @@ -2,10 +2,14 @@ package legacy import ( "path/filepath" + "sync" "github.com/coralproject/coral-importer/common" "github.com/coralproject/coral-importer/common/coral" "github.com/coralproject/coral-importer/strategies" + + "github.com/fatih/color" + "github.com/sirupsen/logrus" ) type CommentRef struct { @@ -41,7 +45,19 @@ func NewContext(c strategies.Context) *Context { // from the MongoDB export. input := c.String("input") + // dryRun indicates that the strategy should not write files and is used for + // validation. + dryRun := c.Bool("dryRun") + + if dryRun { + color.New(color.Bold, color.FgRed).Println("--dryRun is enabled, files will not be written") + logrus.Warn("dry run is enabled, files will not be written") + } + + var mutex sync.Mutex + return &Context{ + DryRun: dryRun, TenantID: tenantID, SiteID: siteID, Filenames: Filenames{ @@ -59,6 +75,7 @@ func NewContext(c strategies.Context) *Context { }, }, Reconstructor: common.NewReconstructor(), + Mutex: &mutex, users: map[string]*UserRef{}, stories: map[string]*StoryRef{}, comments: map[string]*CommentRef{}, @@ -85,10 +102,12 @@ type Filenames struct { } type Context struct { + DryRun bool TenantID string SiteID string Filenames Filenames Reconstructor *common.Reconstructor + Mutex *sync.Mutex users map[string]*UserRef stories map[string]*StoryRef diff --git a/strategies/legacy/legacy.go b/strategies/legacy/legacy.go index b8f1b9b..e43e05b 100644 --- a/strategies/legacy/legacy.go +++ b/strategies/legacy/legacy.go @@ -7,11 +7,14 @@ import ( "github.com/coralproject/coral-importer/common" "github.com/coralproject/coral-importer/internal/utility" + "github.com/coralproject/coral-importer/internal/utility/counter" "github.com/coralproject/coral-importer/strategies" + + "github.com/josharian/intern" easyjson "github.com/mailru/easyjson" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "github.com/urfave/cli" + "github.com/urfave/cli/v2" ) // PreferredPerspectiveModel is the stored preferred perspective model that @@ -59,7 +62,7 @@ func Import(c strategies.Context) error { logrus.WithField("comments", len(ctx.comments)).Debug("finished loading comments into map") - if err := ProcessCommentActions(ctx); err != nil { + if err := WriteCommentActions(ctx); err != nil { return errors.Wrap(err, "could not process comment actions") } @@ -68,15 +71,12 @@ func Import(c strategies.Context) error { startedReconstructionAt := time.Now() logrus.Debug("reconstructing families based on parent id map") - // Reconstruct all the family relationships from the parentID map. - for commentID, comment := range ctx.comments { - ctx.Reconstructor.AddIDs(commentID, comment.ParentID) - } + ReconstructFamilies(ctx) logrus.WithField("took", time.Since(startedReconstructionAt).String()).Debug("finished family reconstruction") // Load all the comments in from the comments.json file. - if err := ProcessComments(ctx); err != nil { + if err := WriteComments(ctx); err != nil { return errors.Wrap(err, "could not read comments json") } @@ -85,7 +85,7 @@ func Import(c strategies.Context) error { runtime.GC() // Process the stories now. - if err := ProcessStories(ctx); err != nil { + if err := WriteStories(ctx); err != nil { return errors.Wrap(err, "could not process stories") } @@ -93,19 +93,27 @@ func Import(c strategies.Context) error { ctx.ReleaseStories() runtime.GC() - if err := ProcessUsers(ctx); err != nil { + if err := WriteUsers(ctx); err != nil { return errors.Wrap(err, "could not process users") } // Mark when we finished. finished := time.Now() - logrus.WithField("took", finished.Sub(started).String()).Info("finished processing") + logrus.WithField("took", finished.Sub(started).String()).Debug("finished processing") return nil } func SeedCommentsReferences(ctx *Context) error { - return utility.ReadJSON(ctx.Filenames.Input.Comments, func(line int, data []byte) error { + bar, err := utility.NewLineCounter("(1/6) Loading Comments", ctx.Filenames.Input.Comments) + if err != nil { + return errors.Wrap(err, "could not count actions file") + } + defer bar.Finish() + + return utility.ReadJSONConcurrently(ctx.Filenames.Input.Comments, func(line int, data []byte) error { + defer bar.Increment() + var in Comment if err := easyjson.Unmarshal(data, &in); err != nil { logrus.WithField("line", line).Error(err) @@ -120,8 +128,11 @@ func SeedCommentsReferences(ctx *Context) error { return nil } + ctx.Mutex.Lock() + defer ctx.Mutex.Unlock() + ref, _ := ctx.FindOrCreateComment(in.ID) - ref.Status = TranslateCommentStatus(in.Status) + ref.Status = intern.String(TranslateCommentStatus(in.Status)) ref.StoryID = in.AssetID if in.ParentID != nil { ref.ParentID = *in.ParentID @@ -131,14 +142,38 @@ func SeedCommentsReferences(ctx *Context) error { }) } -func ProcessCommentActions(ctx *Context) error { - commentActionsWriter, err := utility.NewJSONWriter(ctx.Filenames.Output.CommentActions) +func ReconstructFamilies(ctx *Context) { + bar := counter.New("(3/6) Reconstructing Families", len(ctx.comments)) + defer bar.Finish() + + // Reconstruct all the family relationships from the parentID map. + for commentID, comment := range ctx.comments { + bar.Increment() + + if comment.ParentID == "" { + continue + } + + ctx.Reconstructor.AddIDs(commentID, comment.ParentID) + } +} + +func WriteCommentActions(ctx *Context) error { + commentActionsWriter, err := utility.NewJSONWriter(ctx.DryRun, ctx.Filenames.Output.CommentActions) if err != nil { return errors.Wrap(err, "could not create commentActionsWriter") } defer commentActionsWriter.Close() - return utility.ReadJSON(ctx.Filenames.Input.Actions, func(line int, data []byte) error { + bar, err := utility.NewLineCounter("(2/6) Writing Comment Actions", ctx.Filenames.Input.Actions) + if err != nil { + return errors.Wrap(err, "could not count actions file") + } + defer bar.Finish() + + return utility.ReadJSONConcurrently(ctx.Filenames.Input.Actions, func(line int, data []byte) error { + defer bar.Increment() + // Parse the Action from the file. var in Action if err := easyjson.Unmarshal(data, &in); err != nil { @@ -149,8 +184,6 @@ func ProcessCommentActions(ctx *Context) error { // Ignore the action if it's not a comment action. if in.ItemType != "COMMENTS" { - logrus.WithField("line", line).Warn("skipping non-comment action") - return nil } @@ -170,13 +203,19 @@ func ProcessCommentActions(ctx *Context) error { action.StoryID = ref.StoryID + ctx.Mutex.Lock() + defer ctx.Mutex.Unlock() + story, _ := ctx.FindOrCreateStory(ref.StoryID) - ref.ActionCounts[action.ActionType]++ - story.ActionCounts[action.ActionType]++ + actionType := intern.String(action.ActionType) + + ref.ActionCounts[actionType]++ + story.ActionCounts[actionType]++ if action.ActionType == "FLAG" { - ref.ActionCounts[action.ActionType+"__"+action.Reason]++ - story.ActionCounts[action.ActionType+"__"+action.Reason]++ + reason := intern.String(action.ActionType + "__" + action.Reason) + ref.ActionCounts[reason]++ + story.ActionCounts[reason]++ } if err := commentActionsWriter.Write(action); err != nil { @@ -187,14 +226,22 @@ func ProcessCommentActions(ctx *Context) error { }) } -func ProcessComments(ctx *Context) error { - commentsWriter, err := utility.NewJSONWriter(ctx.Filenames.Output.Comments) +func WriteComments(ctx *Context) error { + commentsWriter, err := utility.NewJSONWriter(ctx.DryRun, ctx.Filenames.Output.Comments) if err != nil { return errors.Wrap(err, "could not create comments writer") } defer commentsWriter.Close() - return utility.ReadJSON(ctx.Filenames.Input.Comments, func(line int, data []byte) error { + bar, err := utility.NewLineCounter("(4/6) Writing Comments", ctx.Filenames.Input.Comments) + if err != nil { + return errors.Wrap(err, "could not count comments file") + } + defer bar.Finish() + + return utility.ReadJSONConcurrently(ctx.Filenames.Input.Comments, func(line int, data []byte) error { + defer bar.Increment() + // Parse the Comment from the file. var in Comment if err := easyjson.Unmarshal(data, &in); err != nil { @@ -226,6 +273,9 @@ func ProcessComments(ctx *Context) error { comment.ChildCount = len(comment.ChildIDs) comment.AncestorIDs = ctx.Reconstructor.GetAncestors(comment.ID) + ctx.Mutex.Lock() + defer ctx.Mutex.Unlock() + user, _ := ctx.FindOrCreateUser(comment.AuthorID) user.StatusCounts.Increment(comment.Status, 1) @@ -246,15 +296,22 @@ func ProcessComments(ctx *Context) error { }) } -func ProcessStories(ctx *Context) error { - storiesWriter, err := utility.NewJSONWriter(ctx.Filenames.Output.Stories) - // storiesWriter, err := utility.NewJSONWriter(storiesOutputFilename) +func WriteStories(ctx *Context) error { + storiesWriter, err := utility.NewJSONWriter(ctx.DryRun, ctx.Filenames.Output.Stories) if err != nil { return errors.Wrap(err, "could not create stories writer") } defer storiesWriter.Close() - return utility.ReadJSON(ctx.Filenames.Input.Assets, func(line int, data []byte) error { + bar, err := utility.NewLineCounter("(5/6) Writing Stories", ctx.Filenames.Input.Assets) + if err != nil { + return errors.Wrap(err, "could not count assets file") + } + defer bar.Finish() + + return utility.ReadJSONConcurrently(ctx.Filenames.Input.Assets, func(line int, data []byte) error { + defer bar.Increment() + // Parse the asset from the file. var in Asset if err := easyjson.Unmarshal(data, &in); err != nil { @@ -265,6 +322,8 @@ func ProcessStories(ctx *Context) error { story := TranslateAsset(ctx.TenantID, ctx.SiteID, &in) + // Locking isn't needed as each of these stories is unique. + if ref, ok := ctx.FindStory(story.ID); ok { // Get the status counts for this story. story.CommentCounts.Status = ref.StatusCounts @@ -287,6 +346,9 @@ func ProcessStories(ctx *Context) error { story.CommentCounts.ModerationQueue.Queues.Reported += ref.Flagged } + ctx.Mutex.Lock() + defer ctx.Mutex.Unlock() + if err := storiesWriter.Write(story); err != nil { return errors.Wrap(err, "couldn't write out story") } @@ -295,15 +357,22 @@ func ProcessStories(ctx *Context) error { }) } -func ProcessUsers(ctx *Context) error { - // usersWriter, err := utility.NewJSONWriter(usersOutputFilename) - usersWriter, err := utility.NewJSONWriter(ctx.Filenames.Output.Users) +func WriteUsers(ctx *Context) error { + usersWriter, err := utility.NewJSONWriter(ctx.DryRun, ctx.Filenames.Output.Users) if err != nil { return errors.Wrap(err, "could not create users writer") } defer usersWriter.Close() - return utility.ReadJSON(ctx.Filenames.Input.Users, func(line int, data []byte) error { + bar, err := utility.NewLineCounter("(6/6) Writing Users", ctx.Filenames.Input.Users) + if err != nil { + return errors.Wrap(err, "could not count users file") + } + defer bar.Finish() + + return utility.ReadJSONConcurrently(ctx.Filenames.Input.Users, func(line int, data []byte) error { + defer bar.Increment() + // Parse the user from the file. var in User if err := easyjson.Unmarshal(data, &in); err != nil { @@ -314,11 +383,16 @@ func ProcessUsers(ctx *Context) error { user := TranslateUser(ctx.TenantID, &in) + // Locking isn't needed as each of these users is unique. + // Get the status counts for this story. if ref, ok := ctx.FindUser(user.ID); ok { user.CommentCounts.Status = ref.StatusCounts } + ctx.Mutex.Lock() + defer ctx.Mutex.Unlock() + if err := usersWriter.Write(user); err != nil { return errors.Wrap(err, "couldn't write out user") } diff --git a/strategies/legacy/mapper/cli.go b/strategies/legacy/mapper/cli.go new file mode 100644 index 0000000..122118b --- /dev/null +++ b/strategies/legacy/mapper/cli.go @@ -0,0 +1,53 @@ +package mapper + +import ( + "path/filepath" + + "github.com/fatih/color" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/urfave/cli/v2" +) + +// CLI is the actual task ran when running this tool. +func CLI(c *cli.Context) error { + // input is the name of the folder where we are loading out collections + // from the MongoDB export. + input := c.String("input") + + // output is the name of the folder where there is the files that have already + // been processed by the importer. + output := c.String("output") + + // post is the name of the folder where we are placing our outputted dumps + // ready for MongoDB import. + post := c.String("post") + + // dryRun indicates that the strategy should not write files and is used for + // validation. + dryRun := c.Bool("dryRun") + + if dryRun { + color.New(color.Bold, color.FgRed).Println("--dryRun is enabled, files will not be written") + logrus.Warn("dry run is enabled, files will not be written") + } + + m := New(dryRun) + + // Load the configuration and compile the replacement expressions. + if err := m.LoadConfig(); err != nil { + return errors.Wrap(err, "could not load the config") + } + + // Load all the updates for users in the --pre file. + if err := m.Pre(filepath.Join(input, "users.json")); err != nil { + return errors.Wrap(err, "could not load the pre users") + } + + // Process all the updates to the post file. + if err := m.Post(filepath.Join(output, "users.json"), filepath.Join(post, "users.json")); err != nil { + return errors.Wrap(err, "could not load the post users") + } + + return nil +} diff --git a/strategies/legacy/mapper/getter.go b/strategies/legacy/mapper/getter.go new file mode 100644 index 0000000..571c1f5 --- /dev/null +++ b/strategies/legacy/mapper/getter.go @@ -0,0 +1,48 @@ +package mapper + +import "strings" + +func newGetter(path string) *getter { + parts := strings.Split(path, ".") + return &getter{ + len: len(parts), + parts: parts, + } +} + +type getter struct { + len int + parts []string +} + +func (g *getter) Get(obj map[string]interface{}) (string, bool) { + current := obj + + for depth, part := range g.parts { + element, ok := current[part] + if !ok { + return "", false + } + + // If we're at the end, then we should check that the element is a string. + if depth+1 == g.len { + value, ok := element.(string) + if !ok { + return "", false + } + + return value, true + } + + // Otherwise, we should check to see that this is a map. + value, ok := element.(map[string]interface{}) + if !ok { + return "", false + } + + // And advance the pointer. + current = value + } + + return "", false +} diff --git a/strategies/legacy/mapper/mapper.go b/strategies/legacy/mapper/mapper.go new file mode 100644 index 0000000..44d15a7 --- /dev/null +++ b/strategies/legacy/mapper/mapper.go @@ -0,0 +1,253 @@ +package mapper + +import ( + "encoding/json" + + "github.com/coralproject/coral-importer/common/coral" + "github.com/coralproject/coral-importer/internal/utility" + "github.com/coralproject/coral-importer/internal/warnings" + "github.com/kelseyhightower/envconfig" + "github.com/mailru/easyjson" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type User map[string]interface{} + +// Update describes an update for a User. +type Update struct { + // Username is the new username that should be associated with the user. + Username string + + // Email is the new email that should be associated with the user. + Email string + + // SSO is the ID of the SSO user profile that should be associated with the + // user. + SSO string +} + +type Operation func(pre *User, update *Update) bool + +func New(dryRun bool) *Mapper { + return &Mapper{ + dryRun: dryRun, + updates: make(map[string]Update), + } +} + +type Mapper struct { + dryRun bool + operations []Operation + updates map[string]Update +} + +// LoadConfig will load the configuration from the specified file. +func (m *Mapper) LoadConfig() error { + var config struct { + Users struct { + Username string `envconfig:"username"` + SSO struct { + ID string `envconfig:"id"` + Provider string `envconfig:"provider"` + Email string `envconfig:"email"` + } `envconfig:"sso"` + } `envconfig:"users"` + } + + if err := envconfig.Process("CORAL_MAPPER", &config); err != nil { + return errors.Wrap(err, "could not load config ") + } + + if config.Users.Username != "" { + getter := newGetter(config.Users.Username) + m.operations = append(m.operations, func(pre *User, update *Update) bool { + // Try to get the username from the pre user. + username, ok := getter.Get(*pre) + if !ok { + return false + } + + update.Username = username + return true + }) + } + + if config.Users.SSO.ID != "" { + getter := newGetter(config.Users.SSO.ID) + m.operations = append(m.operations, func(pre *User, update *Update) bool { + // Try to get the SSO ID from the pre user. + id, ok := getter.Get(*pre) + if !ok { + return false + } + + update.SSO = id + + return true + }) + } else if config.Users.SSO.Provider != "" { + var getter *getter + + // If the SSO Email is enabled, setup the getter for it. + if config.Users.SSO.Email != "" { + getter = newGetter(config.Users.SSO.Email) + } + + m.operations = append(m.operations, func(pre *User, update *Update) bool { + // Try to get the profiles from the user. + profiles, ok := (*pre)["profiles"].([]interface{}) + if !ok { + return false + } + + // Iterate over the profiles to find the profile with the specified + // provider. + for _, profile := range profiles { + p, ok := profile.(map[string]interface{}) + if !ok { + return false + } + + provider, ok := p["provider"].(string) + if !ok { + return false + } + + if provider != config.Users.SSO.Provider { + continue + } + + // Looks like we found the provider! Get the ID from the provider. + id, ok := p["id"].(string) + if !ok { + return false + } + + update.SSO = id + + // If we have the email getter enabled... Then also check for that! + if getter == nil { + return true + } + + email, ok := getter.Get(p) + if !ok { + return true + } + + update.Email = email + + return true + } + + return false + }) + } else if config.Users.SSO.Email != "" { + return errors.New("must specify users.sso.provider when specifying users.sso.email") + } + + return nil +} + +func (m *Mapper) Pre(input string) error { + bar, err := utility.NewLineCounter("(1/2) Computing User Updates", input) + if err != nil { + return errors.Wrap(err, "could not count input users file") + } + defer bar.Finish() + + return utility.ReadJSON(input, func(line int, data []byte) error { + defer bar.Increment() + + var user User + if err := json.Unmarshal(data, &user); err != nil { + return errors.Wrap(err, "could not load a user in the --pre file") + } + + id, ok := user["id"].(string) + if !ok { + return errors.New("could not get the user id from a user in --pre file") + } + + var updated bool + + var update Update + for _, operation := range m.operations { + updated = operation(&user, &update) || updated + } + + if !updated { + return nil + } + + m.updates[id] = update + + return nil + }) +} + +func (m *Mapper) Post(output, post string) error { + writer, err := utility.NewJSONWriter(m.dryRun, post) + if err != nil { + return errors.Wrap(err, "could not create writer") + } + defer writer.Close() + + bar, err := utility.NewLineCounter("(2/2) Writing User Updates", output) + if err != nil { + return errors.Wrap(err, "could not count output users file") + } + defer bar.Finish() + + return utility.ReadJSON(output, func(line int, data []byte) error { + defer bar.Increment() + + var user coral.User + if err := easyjson.Unmarshal(data, &user); err != nil { + return errors.Wrap(err, "could not load a user") + } + + // Check to see if we have updates for this user. + update, ok := m.updates[user.ID] + if !ok { + if err := writer.Write(user); err != nil { + return errors.Wrap(err, "could not write user") + } + + return nil + } + + if update.SSO != "" { + // Check that the user's ID matches the SSO ID. + if update.SSO != user.ID { + warnings.SSOIDMismatch.Once(func() { + logrus.WithField("id", user.ID).Warn("found a user that had an SSO ID that did not match their User ID") + }) + } + + // Add the new profile. + user.Profiles = append(user.Profiles, coral.UserProfile{ + ID: update.SSO, + Type: "sso", + }) + } + + if update.Email != "" { + user.Email = update.Email + } + + if update.Username != "" { + user.Username = update.Username + } + + // Remove the update. + delete(m.updates, user.ID) + + if err := writer.Write(user); err != nil { + return errors.Wrap(err, "could not write user") + } + + return nil + }) +} diff --git a/strategies/legacy/models.go b/strategies/legacy/models.go index 540c502..2d6f91f 100644 --- a/strategies/legacy/models.go +++ b/strategies/legacy/models.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/coralproject/coral-importer/common/coral" + "github.com/coralproject/coral-importer/internal/warnings" uuid "github.com/satori/go.uuid" "github.com/sirupsen/logrus" @@ -14,10 +15,10 @@ import ( // Action is the Action as exported from MongoDB from legacy Talk. type Action struct { ID string `json:"id"` - ActionType string `json:"action_type"` - GroupID string `json:"group_id"` + ActionType string `json:"action_type,intern"` + GroupID string `json:"group_id,intern"` ItemID string `json:"item_id"` - ItemType string `json:"item_type"` + ItemType string `json:"item_type,intern"` UserID *string `json:"user_id"` Metadata map[string]interface{} `json:"metadata"` CreatedAt coral.Time `json:"created_at"` @@ -47,7 +48,6 @@ func TranslateCommentAction(tenantID, siteID string, action *Action) *coral.Comm commentAction.Reason = "COMMENT_DETECTED_RECENT_HISTORY" case "TOXIC_COMMENT": commentAction.Reason = "COMMENT_DETECTED_TOXIC" - case "": default: } @@ -87,7 +87,7 @@ type CommentBodyHistory struct { type CommentTag struct { AssignedBy *string `json:"assigned_by"` Tag struct { - Name string `json:"name"` + Name string `json:"name,intern"` } `json:"tag"` CreatedAt coral.Time `json:"created_at"` } @@ -95,10 +95,10 @@ type CommentTag struct { type Comment struct { ID string `json:"id"` AssetID string `json:"asset_id"` - Status string `json:"status"` + Status string `json:"status,intern"` StatusHistory []struct { AssignedBy *string `json:"assigned_by"` - Type string `json:"type"` + Type string `json:"type,intern"` CreatedAt coral.Time `json:"created_at"` } `json:"status_history"` Metadata *struct { @@ -297,7 +297,7 @@ func TranslateAsset(tenantID, siteID string, asset *Asset) *coral.Story { type UserProfile struct { ID string `json:"id"` - Provider string `json:"provider"` + Provider string `json:"provider,intern"` } type UserToken struct { @@ -327,17 +327,17 @@ type UserMetadata struct { type User struct { ID string `json:"id"` Username string `json:"username"` - Role string `json:"role"` + Role string `json:"role,intern"` Password string `json:"password"` IgnoredUsers []string `json:"ignoresUsers"` Profiles []UserProfile `json:"profiles"` Tokens []UserToken `json:"tokens"` Status struct { Username struct { - Status string `json:"status"` + Status string `json:"status,intern"` History []struct { AssignedBy *string `json:"assigned_by"` - Status string `json:"status"` + Status string `json:"status,intern"` CreatedAt coral.Time `json:"created_at"` } `json:"history"` } `json:"username"` @@ -394,7 +394,9 @@ func TranslateUserProfile(user *coral.User, in *User, profile UserProfile) *cora Type: "google", } default: - logrus.WithField("provider", profile.Provider).Warn("unsupported provider not imported") + warnings.UnsupportedUserProfileProvider.OnceWith(func() { + logrus.WithField("provider", profile.Provider).Warn("unsupported provider not imported") + }, profile.Provider) return nil } diff --git a/strategies/legacy/models_easyjson.go b/strategies/legacy/models_easyjson.go index ba97081..f6ba0fd 100644 --- a/strategies/legacy/models_easyjson.go +++ b/strategies/legacy/models_easyjson.go @@ -120,7 +120,7 @@ func easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLegacy1(i case "id": out.ID = string(in.String()) case "provider": - out.Provider = string(in.String()) + out.Provider = string(in.StringIntern()) default: in.SkipRecursive() } @@ -533,7 +533,7 @@ func easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLegacy5(i case "username": out.Username = string(in.String()) case "role": - out.Role = string(in.String()) + out.Role = string(in.StringIntern()) case "password": out.Password = string(in.String()) case "ignoresUsers": @@ -750,10 +750,10 @@ func (v *User) UnmarshalEasyJSON(l *jlexer.Lexer) { } func easyjsonD2b7633eDecode(in *jlexer.Lexer, out *struct { Username struct { - Status string `json:"status"` + Status string `json:"status,intern"` History []struct { AssignedBy *string `json:"assigned_by"` - Status string `json:"status"` + Status string `json:"status,intern"` CreatedAt coral.Time `json:"created_at"` } `json:"history"` } `json:"username"` @@ -822,10 +822,10 @@ func easyjsonD2b7633eDecode(in *jlexer.Lexer, out *struct { } func easyjsonD2b7633eEncode(out *jwriter.Writer, in struct { Username struct { - Status string `json:"status"` + Status string `json:"status,intern"` History []struct { AssignedBy *string `json:"assigned_by"` - Status string `json:"status"` + Status string `json:"status,intern"` CreatedAt coral.Time `json:"created_at"` } `json:"history"` } `json:"username"` @@ -1469,10 +1469,10 @@ func easyjsonD2b7633eEncode7(out *jwriter.Writer, in struct { out.RawByte('}') } func easyjsonD2b7633eDecode1(in *jlexer.Lexer, out *struct { - Status string `json:"status"` + Status string `json:"status,intern"` History []struct { AssignedBy *string `json:"assigned_by"` - Status string `json:"status"` + Status string `json:"status,intern"` CreatedAt coral.Time `json:"created_at"` } `json:"history"` }) { @@ -1495,7 +1495,7 @@ func easyjsonD2b7633eDecode1(in *jlexer.Lexer, out *struct { } switch key { case "status": - out.Status = string(in.String()) + out.Status = string(in.StringIntern()) case "history": if in.IsNull() { in.Skip() @@ -1506,13 +1506,13 @@ func easyjsonD2b7633eDecode1(in *jlexer.Lexer, out *struct { if !in.IsDelim(']') { out.History = make([]struct { AssignedBy *string `json:"assigned_by"` - Status string `json:"status"` + Status string `json:"status,intern"` CreatedAt coral.Time `json:"created_at"` }, 0, 1) } else { out.History = []struct { AssignedBy *string `json:"assigned_by"` - Status string `json:"status"` + Status string `json:"status,intern"` CreatedAt coral.Time `json:"created_at"` }{} } @@ -1522,7 +1522,7 @@ func easyjsonD2b7633eDecode1(in *jlexer.Lexer, out *struct { for !in.IsDelim(']') { var v19 struct { AssignedBy *string `json:"assigned_by"` - Status string `json:"status"` + Status string `json:"status,intern"` CreatedAt coral.Time `json:"created_at"` } easyjsonD2b7633eDecode8(in, &v19) @@ -1542,10 +1542,10 @@ func easyjsonD2b7633eDecode1(in *jlexer.Lexer, out *struct { } } func easyjsonD2b7633eEncode1(out *jwriter.Writer, in struct { - Status string `json:"status"` + Status string `json:"status,intern"` History []struct { AssignedBy *string `json:"assigned_by"` - Status string `json:"status"` + Status string `json:"status,intern"` CreatedAt coral.Time `json:"created_at"` } `json:"history"` }) { @@ -1577,7 +1577,7 @@ func easyjsonD2b7633eEncode1(out *jwriter.Writer, in struct { } func easyjsonD2b7633eDecode8(in *jlexer.Lexer, out *struct { AssignedBy *string `json:"assigned_by"` - Status string `json:"status"` + Status string `json:"status,intern"` CreatedAt coral.Time `json:"created_at"` }) { isTopLevel := in.IsStart() @@ -1609,7 +1609,7 @@ func easyjsonD2b7633eDecode8(in *jlexer.Lexer, out *struct { *out.AssignedBy = string(in.String()) } case "status": - out.Status = string(in.String()) + out.Status = string(in.StringIntern()) case "created_at": if data := in.Raw(); in.Ok() { in.AddError((out.CreatedAt).UnmarshalJSON(data)) @@ -1626,7 +1626,7 @@ func easyjsonD2b7633eDecode8(in *jlexer.Lexer, out *struct { } func easyjsonD2b7633eEncode8(out *jwriter.Writer, in struct { AssignedBy *string `json:"assigned_by"` - Status string `json:"status"` + Status string `json:"status,intern"` CreatedAt coral.Time `json:"created_at"` }) { out.RawByte('{') @@ -1748,7 +1748,7 @@ func (v *CommentTag) UnmarshalEasyJSON(l *jlexer.Lexer) { easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLegacy6(l, v) } func easyjsonD2b7633eDecode9(in *jlexer.Lexer, out *struct { - Name string `json:"name"` + Name string `json:"name,intern"` }) { isTopLevel := in.IsStart() if in.IsNull() { @@ -1769,7 +1769,7 @@ func easyjsonD2b7633eDecode9(in *jlexer.Lexer, out *struct { } switch key { case "name": - out.Name = string(in.String()) + out.Name = string(in.StringIntern()) default: in.SkipRecursive() } @@ -1781,7 +1781,7 @@ func easyjsonD2b7633eDecode9(in *jlexer.Lexer, out *struct { } } func easyjsonD2b7633eEncode9(out *jwriter.Writer, in struct { - Name string `json:"name"` + Name string `json:"name,intern"` }) { out.RawByte('{') first := true @@ -1892,7 +1892,7 @@ func easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLegacy8(i case "asset_id": out.AssetID = string(in.String()) case "status": - out.Status = string(in.String()) + out.Status = string(in.StringIntern()) case "status_history": if in.IsNull() { in.Skip() @@ -1903,13 +1903,13 @@ func easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLegacy8(i if !in.IsDelim(']') { out.StatusHistory = make([]struct { AssignedBy *string `json:"assigned_by"` - Type string `json:"type"` + Type string `json:"type,intern"` CreatedAt coral.Time `json:"created_at"` }, 0, 1) } else { out.StatusHistory = []struct { AssignedBy *string `json:"assigned_by"` - Type string `json:"type"` + Type string `json:"type,intern"` CreatedAt coral.Time `json:"created_at"` }{} } @@ -1919,7 +1919,7 @@ func easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLegacy8(i for !in.IsDelim(']') { var v22 struct { AssignedBy *string `json:"assigned_by"` - Type string `json:"type"` + Type string `json:"type,intern"` CreatedAt coral.Time `json:"created_at"` } easyjsonD2b7633eDecode10(in, &v22) @@ -2319,7 +2319,7 @@ func easyjsonD2b7633eEncode12(out *jwriter.Writer, in struct { } func easyjsonD2b7633eDecode10(in *jlexer.Lexer, out *struct { AssignedBy *string `json:"assigned_by"` - Type string `json:"type"` + Type string `json:"type,intern"` CreatedAt coral.Time `json:"created_at"` }) { isTopLevel := in.IsStart() @@ -2351,7 +2351,7 @@ func easyjsonD2b7633eDecode10(in *jlexer.Lexer, out *struct { *out.AssignedBy = string(in.String()) } case "type": - out.Type = string(in.String()) + out.Type = string(in.StringIntern()) case "created_at": if data := in.Raw(); in.Ok() { in.AddError((out.CreatedAt).UnmarshalJSON(data)) @@ -2368,7 +2368,7 @@ func easyjsonD2b7633eDecode10(in *jlexer.Lexer, out *struct { } func easyjsonD2b7633eEncode10(out *jwriter.Writer, in struct { AssignedBy *string `json:"assigned_by"` - Type string `json:"type"` + Type string `json:"type,intern"` CreatedAt coral.Time `json:"created_at"` }) { out.RawByte('{') @@ -2844,13 +2844,13 @@ func easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLegacy10( case "id": out.ID = string(in.String()) case "action_type": - out.ActionType = string(in.String()) + out.ActionType = string(in.StringIntern()) case "group_id": - out.GroupID = string(in.String()) + out.GroupID = string(in.StringIntern()) case "item_id": out.ItemID = string(in.String()) case "item_type": - out.ItemType = string(in.String()) + out.ItemType = string(in.StringIntern()) case "user_id": if in.IsNull() { in.Skip() diff --git a/strategies/livefyre/comments.go b/strategies/livefyre/comments.go deleted file mode 100644 index cc5587d..0000000 --- a/strategies/livefyre/comments.go +++ /dev/null @@ -1,210 +0,0 @@ -package livefyre - -import ( - "github.com/coralproject/coral-importer/common" - "github.com/coralproject/coral-importer/common/coral" - "github.com/coralproject/coral-importer/common/pipeline" - easyjson "github.com/mailru/easyjson" - "github.com/pkg/errors" - uuid "github.com/satori/go.uuid" - "github.com/sirupsen/logrus" -) - -func ProcessComments(tenantID, siteID string, authorIDs map[string]string) pipeline.WritingProcessor { - return func(write pipeline.CollectionWriter, n *pipeline.TaskReaderInput) error { - // Parse the Story from the file. - var in Story - if err := easyjson.Unmarshal([]byte(n.Input), &in); err != nil { - return errors.Wrap(err, "could not parse a comment in the --input file") - } - - // Check the input to ensure we're validated. - if err := common.Check(&in); err != nil { - logrus.WithField("line", n.Line).WithError(err).Error("cannot validate story") - - return errors.Wrap(err, "checking failed input Story") - } - - // Translate the Story to a coral.Story. - story := TranslateStory(tenantID, siteID, &in) - - // Check the story to ensure we're validated. - if err := common.Check(story); err != nil { - return errors.Wrap(err, "checking failed output coral.Story") - } - - // Collect all the stories comments so we can process family - // relationships as well. - storyComments := make([]*coral.Comment, 0, len(in.Comments)) - - // Reconstruct family relationships for these comments. - r := common.NewReconstructor() - - // Store the reaction total for the story. - storyReactionTotal := 0 - - // Translate the comments. - for i, inc := range in.Comments { - if inc.AuthorID == "" { - logrus.WithFields(logrus.Fields{ - "storyID": story.ID, - "commentID": inc.ID, - "line": n.Line, - }).Warn("comment was missing author_id field") - - continue - } - - // Check the comment to ensure we're validated. - if err := common.Check(&in.Comments[i]); err != nil { - return errors.Wrapf(err, "checking failed input Comment for Story %s", story.ID) - } - - // Remap the authorID. - authorID := authorIDs[inc.AuthorID] - if authorID == "" { - logrus.WithFields(logrus.Fields{ - "storyID": story.ID, - "commentID": inc.ID, - "authorID": inc.AuthorID, - "line": n.Line, - }).Warn("comment author_id did not exist in author map") - - continue - } - inc.AuthorID = authorID - - // Translate the Comment to a coral.Comment. - comment := TranslateComment(tenantID, siteID, &in.Comments[i]) - comment.StoryID = story.ID - - // Check the comment to ensure we're validated. - if err := common.Check(comment); err != nil { - return errors.Wrap(err, "checking failed output coral.Comment") - } - - // Add the comment to the reconstructor. - r.AddComment(comment) - - // Look at the comment to see if there are any likes on it. - if inc.Likes != nil { - reactionTotal := 0 - for _, likeUserID := range inc.Likes { - // Remap the like user ID to the one from the author map. If - // we can't remap the user id it means we don't have a user - // for this like, and therefore it shouldn't be imported - // either. - mappedLikeUserID := authorIDs[likeUserID] - if mappedLikeUserID == "" { - logrus.WithFields(logrus.Fields{ - "storyID": story.ID, - "commentID": inc.ID, - "like": likeUserID, - "line": n.Line, - }).Warn("could not find user ID of like in author map, not importing like") - - continue - } - - // Create a new Comment Action for this like. - action := coral.NewCommentAction(tenantID, siteID) - action.ID = uuid.NewV4().String() - action.ActionType = "REACTION" - action.CommentID = comment.ID - action.UserID = &mappedLikeUserID - action.CommentRevisionID = comment.ID - action.StoryID = story.ID - - // Check the action to ensure we're validated. - if err := common.Check(action); err != nil { - return errors.Wrap(err, "checking failed output coral.CommentAction") - } - - if err := write("commentActions", action); err != nil { - return errors.Wrap(err, "couldn't write out commentAction") - } - - logrus.WithFields(logrus.Fields{ - "storyID": story.ID, - "commentID": comment.ID, - "line": n.Line, - }).Debug("imported reaction") - - reactionTotal++ - } - - // Add the reaction count to the comment. - comment.ActionCounts["REACTION"] = reactionTotal - storyReactionTotal += reactionTotal - } - - // Add it to the story comments. - storyComments = append(storyComments, comment) - } - - if len(storyComments) == 0 { - logrus.WithFields(logrus.Fields{ - "storyID": story.ID, - "line": n.Line, - }).Warn("no comments imported from story") - } - - // Send the comments off to the importer. - for _, comment := range storyComments { - // Add reconstructed data. - comment.ChildIDs = r.GetChildren(comment.ID) - comment.ChildCount = len(comment.ChildIDs) - comment.AncestorIDs = r.GetAncestors(comment.ID) - - // Send the comment to the importer. - if err := write("comments", comment); err != nil { - return errors.Wrap(err, "couldn't write out comment") - } - - logrus.WithFields(logrus.Fields{ - "storyID": story.ID, - "commentID": comment.ID, - "line": n.Line, - }).Debug("imported comment") - } - - // Increment the stories comment counts. - for _, comment := range storyComments { - story.IncrementCommentCounts(comment.Status) - } - story.CommentCounts.Action["REACTION"] = storyReactionTotal - - // Send the story to the importer. - if err := write("stories", story); err != nil { - return errors.Wrap(err, "couldn't write out story") - } - - logrus.WithFields(logrus.Fields{ - "storyID": story.ID, - "line": n.Line, - }).Debug("imported story") - - logrus.WithFields(logrus.Fields{ - "storyID": story.ID, - "line": n.Line, - "comments": len(storyComments), - }).Info("finished line") - - return nil - } -} - -func ProcessCommentStatusMap() pipeline.SummerProcessor { - return func(writer pipeline.SummerWriter, n *pipeline.TaskReaderInput) error { - // Parse the comment from the file. - var comment coral.Comment - if err := easyjson.Unmarshal([]byte(n.Input), &comment); err != nil { - return errors.Wrap(err, "could not parse an comment") - } - - // Add the status to the map referencing the user id. - writer(comment.AuthorID, comment.Status, 1) - - return nil - } -} diff --git a/strategies/livefyre/livefyre.go b/strategies/livefyre/livefyre.go deleted file mode 100644 index 1b6f75c..0000000 --- a/strategies/livefyre/livefyre.go +++ /dev/null @@ -1,131 +0,0 @@ -package livefyre - -import ( - "path/filepath" - "strings" - "time" - - "github.com/coralproject/coral-importer/common/pipeline" - "github.com/coralproject/coral-importer/strategies" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "github.com/urfave/cli" -) - -// Time is the time.Time representation that LiveFyre uses. -type Time struct { - time.Time -} - -// UnmarshalJSON is the custom unmarshaler for the input JSON from LiveFyre. -func (t *Time) UnmarshalJSON(buf []byte) error { - tt, err := time.Parse("2006-01-02T15:04:05", strings.Trim(string(buf), `"`)) - if err != nil { - return errors.Wrap(err, "could not parse livefyre time") - } - - t.Time = tt - - return nil -} - -func CLI(c *cli.Context) error { - return Import(c) -} - -// Import will handle a data import task for importing comments into Coral -// from a LiveFyre export. -func Import(c strategies.Context) error { - // tenantID is the ID of the Tenant that we are importing these documents - // for. - tenantID := c.String("tenantID") - - // siteID is the ID of the Site that we're importing records for. - siteID := c.String("siteID") - - // commentsFileName is the name of the file that that contains the comment - // exports from LiveFyre. - commentsFileName := c.String("comments") - - // usersFileName is the name of the file that that contains the comment - // exports from LiveFyre. - usersFileName := c.String("users") - - // folder is the name of the folder where we are placing our outputted dumps - // ready for MongoDB import. - folder := c.String("output") - - // sso when true indicates that we should create a sso profile for generated - // users. - sso := c.Bool("sso") - - // Mark when we started. - started := time.Now() - logrus.Info("started") - - // Process the users file first because we need to de-duplicate users as - // they are parsed because LiveFyre did not lowercase email addresses, - // causing multiple users to be created for each email address variation. - users, err := pipeline.NewMapAggregator( - pipeline.MergeTaskAggregatorOutputPipelines( - pipeline.FanAggregatingProcessor( - pipeline.NewJSONFileReader(usersFileName), - ProcessUsersMap(), - ), - ), - ) - if err != nil { - return errors.Wrap(err, "could not aggregate users") - } - - logrus.WithField("users", len(users["id"])).Info("loaded users") - - // Genreate the users association from the id map. - uniqueUsers := make(map[string]string) - for _, ids := range users["id"] { - for _, id := range ids { - uniqueUsers[id] = ids[0] - } - } - - // Create the processor that will write these entries out. - if err := pipeline.NewFileWriter( - folder, - pipeline.MergeTaskWriterOutputPipelines( - pipeline.FanWritingProcessors( - pipeline.NewJSONFileReader(commentsFileName), - ProcessComments(tenantID, siteID, uniqueUsers), - ), - ), - ); err != nil { - return errors.Wrap(err, "could not process comments and stories for writing") - } - - // Load all the comment statuses by reading the comments.json file again. - statusCounts, err := pipeline.NewSummer( - pipeline.MergeTaskSummerOutputPipelines( - pipeline.FanSummerProcessor( - pipeline.NewJSONFileReader(filepath.Join(folder, "comments.json")), - ProcessCommentStatusMap(), - ), - ), - ) - if err != nil { - return errors.Wrap(err, "could not process status counts") - } - - if err := pipeline.NewFileWriter( - folder, - ProcessUsers(tenantID, sso, users, statusCounts), - ); err != nil { - return errors.Wrap(err, "could not write out users") - } - - logrus.WithField("users", len(users["id"])).Info("wrote users") - - // Mark when we finished. - finished := time.Now() - logrus.WithField("took", finished.Sub(started).String()).Info("finished processing") - - return nil -} diff --git a/strategies/livefyre/models.go b/strategies/livefyre/models.go deleted file mode 100644 index 202b304..0000000 --- a/strategies/livefyre/models.go +++ /dev/null @@ -1,121 +0,0 @@ -//go:generate easyjson -all models.go -package livefyre - -import ( - "fmt" - "time" - - "github.com/coralproject/coral-importer/common/coral" -) - -// Comment is the Comment as exported from the LiveFyre platform. -type Comment struct { - ID int `json:"id" validate:"required"` - BodyHTML string `json:"body_html" validate:"required"` - ParentID int `json:"parent_id"` - AuthorID string `json:"author_id"` - Likes []string `json:"likes"` - State int `json:"state"` - Created Time `json:"created" validate:"required"` -} - -// TranslateComment will copy over simple fields to the new coral.Comment. -func TranslateComment(tenantID, siteID string, in *Comment) *coral.Comment { - comment := coral.NewComment(tenantID, siteID) - comment.ID = fmt.Sprintf("%d", in.ID) - if in.ParentID > 0 { - comment.ParentID = fmt.Sprintf("%d", in.ParentID) - comment.ParentRevisionID = comment.ParentID - } - comment.AuthorID = in.AuthorID - comment.ActionCounts = map[string]int{} - comment.Tags = []coral.CommentTag{} - comment.CreatedAt.Time = in.Created.Time - - switch in.State { - case 0: - comment.Status = "REJECTED" - case 1: - comment.Status = "APPROVED" - case 2: - comment.Status = "REJECTED" - case 3: - comment.Status = "NONE" - case 4: - comment.Status = "PREMOD" - case 5: - comment.Status = "REJECTED" - default: - comment.Status = "NONE" - } - - revision := coral.Revision{ - ID: comment.ID, - Body: coral.HTML(in.BodyHTML), - Metadata: coral.RevisionMetadata{}, - ActionCounts: map[string]int{}, - } - revision.CreatedAt.Time = in.Created.Time - - comment.Revisions = append(comment.Revisions, revision) - - return comment -} - -// Story is the Story as exported from the LiveFyre platform. -type Story struct { - ID string `json:"id" validate:"required"` - Title string `json:"title"` - Source string `json:"source" validate:"required,url"` - Comments []Comment `json:"comments"` - Created Time `json:"created" validate:"required"` -} - -// TranslateStory will copy over simple fields to the new coral.Story. -func TranslateStory(tenantID, siteID string, in *Story) *coral.Story { - story := coral.NewStory(tenantID, siteID) - story.ID = in.ID - story.URL = in.Source - story.Metadata.Title = in.Title - story.CreatedAt.Time = in.Created.Time - - return story -} - -// User represents a User in the LiveFyre platform. -type User struct { - ID string `json:"id" validate:"required"` - DisplayName string `json:"display_name"` - Email string `json:"email" validate:"email,required" conform:"email,lower"` -} - -// TranslateUser will transform a LiveFyre User to a coral.User. -func TranslateUser(tenantID string, in *User, now time.Time) *coral.User { - user := coral.NewUser(tenantID) - user.ID = in.ID - user.Email = in.Email - user.Username = in.DisplayName - - // Add the usernamme history item. - history := coral.UserUsernameStatusHistory{ - ID: in.ID, - Username: in.DisplayName, - CreatedBy: in.ID, - } - history.CreatedAt.Time = now - - user.Status.UsernameStatus.History = append(user.Status.UsernameStatus.History, history) - - // Add the user profile. - profile := coral.UserProfile{ - ID: in.ID, - Type: "sso", - } - profile.LastIssuedAt = &coral.Time{ - Time: now, - } - - user.Profiles = append(user.Profiles, profile) - - return user -} diff --git a/strategies/livefyre/models_easyjson.go b/strategies/livefyre/models_easyjson.go deleted file mode 100644 index 7c29177..0000000 --- a/strategies/livefyre/models_easyjson.go +++ /dev/null @@ -1,369 +0,0 @@ -// Code generated by easyjson for marshaling/unmarshaling. DO NOT EDIT. - -package livefyre - -import ( - json "encoding/json" - easyjson "github.com/mailru/easyjson" - jlexer "github.com/mailru/easyjson/jlexer" - jwriter "github.com/mailru/easyjson/jwriter" -) - -// suppress unused package warning -var ( - _ *json.RawMessage - _ *jlexer.Lexer - _ *jwriter.Writer - _ easyjson.Marshaler -) - -func easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLivefyre(in *jlexer.Lexer, out *User) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeFieldName(false) - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "id": - out.ID = string(in.String()) - case "display_name": - out.DisplayName = string(in.String()) - case "email": - out.Email = string(in.String()) - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjsonD2b7633eEncodeGithubComCoralprojectCoralImporterStrategiesLivefyre(out *jwriter.Writer, in User) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"id\":" - out.RawString(prefix[1:]) - out.String(string(in.ID)) - } - { - const prefix string = ",\"display_name\":" - out.RawString(prefix) - out.String(string(in.DisplayName)) - } - { - const prefix string = ",\"email\":" - out.RawString(prefix) - out.String(string(in.Email)) - } - out.RawByte('}') -} - -// MarshalJSON supports json.Marshaler interface -func (v User) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - easyjsonD2b7633eEncodeGithubComCoralprojectCoralImporterStrategiesLivefyre(&w, v) - return w.Buffer.BuildBytes(), w.Error -} - -// MarshalEasyJSON supports easyjson.Marshaler interface -func (v User) MarshalEasyJSON(w *jwriter.Writer) { - easyjsonD2b7633eEncodeGithubComCoralprojectCoralImporterStrategiesLivefyre(w, v) -} - -// UnmarshalJSON supports json.Unmarshaler interface -func (v *User) UnmarshalJSON(data []byte) error { - r := jlexer.Lexer{Data: data} - easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLivefyre(&r, v) - return r.Error() -} - -// UnmarshalEasyJSON supports easyjson.Unmarshaler interface -func (v *User) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLivefyre(l, v) -} -func easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLivefyre1(in *jlexer.Lexer, out *Story) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeFieldName(false) - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "id": - out.ID = string(in.String()) - case "title": - out.Title = string(in.String()) - case "source": - out.Source = string(in.String()) - case "comments": - if in.IsNull() { - in.Skip() - out.Comments = nil - } else { - in.Delim('[') - if out.Comments == nil { - if !in.IsDelim(']') { - out.Comments = make([]Comment, 0, 0) - } else { - out.Comments = []Comment{} - } - } else { - out.Comments = (out.Comments)[:0] - } - for !in.IsDelim(']') { - var v1 Comment - (v1).UnmarshalEasyJSON(in) - out.Comments = append(out.Comments, v1) - in.WantComma() - } - in.Delim(']') - } - case "created": - if data := in.Raw(); in.Ok() { - in.AddError((out.Created).UnmarshalJSON(data)) - } - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjsonD2b7633eEncodeGithubComCoralprojectCoralImporterStrategiesLivefyre1(out *jwriter.Writer, in Story) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"id\":" - out.RawString(prefix[1:]) - out.String(string(in.ID)) - } - { - const prefix string = ",\"title\":" - out.RawString(prefix) - out.String(string(in.Title)) - } - { - const prefix string = ",\"source\":" - out.RawString(prefix) - out.String(string(in.Source)) - } - { - const prefix string = ",\"comments\":" - out.RawString(prefix) - if in.Comments == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { - out.RawString("null") - } else { - out.RawByte('[') - for v2, v3 := range in.Comments { - if v2 > 0 { - out.RawByte(',') - } - (v3).MarshalEasyJSON(out) - } - out.RawByte(']') - } - } - { - const prefix string = ",\"created\":" - out.RawString(prefix) - out.Raw((in.Created).MarshalJSON()) - } - out.RawByte('}') -} - -// MarshalJSON supports json.Marshaler interface -func (v Story) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - easyjsonD2b7633eEncodeGithubComCoralprojectCoralImporterStrategiesLivefyre1(&w, v) - return w.Buffer.BuildBytes(), w.Error -} - -// MarshalEasyJSON supports easyjson.Marshaler interface -func (v Story) MarshalEasyJSON(w *jwriter.Writer) { - easyjsonD2b7633eEncodeGithubComCoralprojectCoralImporterStrategiesLivefyre1(w, v) -} - -// UnmarshalJSON supports json.Unmarshaler interface -func (v *Story) UnmarshalJSON(data []byte) error { - r := jlexer.Lexer{Data: data} - easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLivefyre1(&r, v) - return r.Error() -} - -// UnmarshalEasyJSON supports easyjson.Unmarshaler interface -func (v *Story) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLivefyre1(l, v) -} -func easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLivefyre2(in *jlexer.Lexer, out *Comment) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeFieldName(false) - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "id": - out.ID = int(in.Int()) - case "body_html": - out.BodyHTML = string(in.String()) - case "parent_id": - out.ParentID = int(in.Int()) - case "author_id": - out.AuthorID = string(in.String()) - case "likes": - if in.IsNull() { - in.Skip() - out.Likes = nil - } else { - in.Delim('[') - if out.Likes == nil { - if !in.IsDelim(']') { - out.Likes = make([]string, 0, 4) - } else { - out.Likes = []string{} - } - } else { - out.Likes = (out.Likes)[:0] - } - for !in.IsDelim(']') { - var v4 string - v4 = string(in.String()) - out.Likes = append(out.Likes, v4) - in.WantComma() - } - in.Delim(']') - } - case "state": - out.State = int(in.Int()) - case "created": - if data := in.Raw(); in.Ok() { - in.AddError((out.Created).UnmarshalJSON(data)) - } - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjsonD2b7633eEncodeGithubComCoralprojectCoralImporterStrategiesLivefyre2(out *jwriter.Writer, in Comment) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"id\":" - out.RawString(prefix[1:]) - out.Int(int(in.ID)) - } - { - const prefix string = ",\"body_html\":" - out.RawString(prefix) - out.String(string(in.BodyHTML)) - } - { - const prefix string = ",\"parent_id\":" - out.RawString(prefix) - out.Int(int(in.ParentID)) - } - { - const prefix string = ",\"author_id\":" - out.RawString(prefix) - out.String(string(in.AuthorID)) - } - { - const prefix string = ",\"likes\":" - out.RawString(prefix) - if in.Likes == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { - out.RawString("null") - } else { - out.RawByte('[') - for v5, v6 := range in.Likes { - if v5 > 0 { - out.RawByte(',') - } - out.String(string(v6)) - } - out.RawByte(']') - } - } - { - const prefix string = ",\"state\":" - out.RawString(prefix) - out.Int(int(in.State)) - } - { - const prefix string = ",\"created\":" - out.RawString(prefix) - out.Raw((in.Created).MarshalJSON()) - } - out.RawByte('}') -} - -// MarshalJSON supports json.Marshaler interface -func (v Comment) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - easyjsonD2b7633eEncodeGithubComCoralprojectCoralImporterStrategiesLivefyre2(&w, v) - return w.Buffer.BuildBytes(), w.Error -} - -// MarshalEasyJSON supports easyjson.Marshaler interface -func (v Comment) MarshalEasyJSON(w *jwriter.Writer) { - easyjsonD2b7633eEncodeGithubComCoralprojectCoralImporterStrategiesLivefyre2(w, v) -} - -// UnmarshalJSON supports json.Unmarshaler interface -func (v *Comment) UnmarshalJSON(data []byte) error { - r := jlexer.Lexer{Data: data} - easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLivefyre2(&r, v) - return r.Error() -} - -// UnmarshalEasyJSON supports easyjson.Unmarshaler interface -func (v *Comment) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjsonD2b7633eDecodeGithubComCoralprojectCoralImporterStrategiesLivefyre2(l, v) -} diff --git a/strategies/livefyre/users.go b/strategies/livefyre/users.go deleted file mode 100644 index 9197d04..0000000 --- a/strategies/livefyre/users.go +++ /dev/null @@ -1,102 +0,0 @@ -package livefyre - -import ( - "fmt" - "time" - - "github.com/coralproject/coral-importer/common" - "github.com/coralproject/coral-importer/common/coral" - "github.com/coralproject/coral-importer/common/pipeline" - easyjson "github.com/mailru/easyjson" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -func ProcessUsersMap() pipeline.AggregatingProcessor { - return func(writer pipeline.AggregationWriter, input *pipeline.TaskReaderInput) error { - // Parse the User from the file. - var in User - if err := easyjson.Unmarshal([]byte(input.Input), &in); err != nil { - return errors.Wrap(err, "could not parse a user in the --users file") - } - - // Check the input to ensure we're validated. - if err := common.Check(&in); err != nil { - logrus.WithError(err).WithField("line", input.Line).Warn("validation failed for input user") - - return nil - } - - // Write the user details out to the writer. - writer("id", in.Email, in.ID) - writer("display_name", in.Email, in.DisplayName) - - return nil - } -} - -func ProcessUsers(tenantID string, sso bool, users map[string]map[string][]string, statusCounts map[string]map[string]int) <-chan pipeline.TaskWriterOutput { - out := make(chan pipeline.TaskWriterOutput) - go func() { - defer close(out) - - now := time.Now() - - for email, displayNames := range users["display_name"] { - // Grab this User's ID's. - id := users["id"][email][0] - - // See if the user has even one display name. - if len(displayNames) == 0 { - displayNames = []string{ - fmt.Sprintf("User %s", id), - } - } - - // Build a coral.User from the user information we have. - user := TranslateUser(tenantID, &User{ - ID: id, - Email: email, - DisplayName: displayNames[0], - }, now) - if sso { - user.Profiles = append(user.Profiles, coral.UserProfile{ - - ID: user.ID, - Type: "sso", - LastIssuedAt: &user.CreatedAt, - }) - } - - // Get the status counts for this user. - userStatusCounts := statusCounts[user.ID] - user.CommentCounts.Status.Approved = userStatusCounts["APPROVED"] - user.CommentCounts.Status.None = userStatusCounts["NONE"] - user.CommentCounts.Status.Premod = userStatusCounts["PREMOD"] - user.CommentCounts.Status.Rejected = userStatusCounts["REJECTED"] - user.CommentCounts.Status.SystemWithheld = userStatusCounts["SYSTEM_WITHHELD"] - - // Serialize the user for output. - doc, err := easyjson.Marshal(user) - if err != nil { - out <- pipeline.TaskWriterOutput{ - Error: errors.Wrap(err, "could not marshal the coral.User"), - } - - return - } - - logrus.WithFields(logrus.Fields{ - "userID": user.ID, - }).Debug("imported user") - - // Write the user out. - out <- pipeline.TaskWriterOutput{ - Collection: "users", - Document: doc, - } - } - }() - - return out -}