Skip to content

Commit

Permalink
add uploader to save to external storage
Browse files Browse the repository at this point in the history
  • Loading branch information
YuheiNakasaka committed Apr 11, 2018
1 parent 123ebd8 commit 646a6c3
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 7 deletions.
47 changes: 46 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions cmd/cli/cli.go
Expand Up @@ -34,15 +34,20 @@ func main() {
Name: "time, t",
Usage: "set airtime",
},
cli.StringFlag{
Name: "storage",
Value: "",
Usage: "use external storage or not(default is local)",
},
},
Action: func(c *cli.Context) error {
switch c.String("station") {
case "ag":
recorder := ag.Ag{}
return recorder.Start(c.Int("id"), c.Int("time"))
return recorder.Start(c.Int("id"), c.Int("time"), c.String("storage"))
case "radiko":
recorder := radiko.Radiko{}
return recorder.Start(c.Int("id"), c.Int("time"))
return recorder.Start(c.Int("id"), c.Int("time"), c.String("storage"))
default:
return fmt.Errorf("radio station not found(e.g -s ag)")
}
Expand Down
9 changes: 8 additions & 1 deletion internal/recorder/ag/ag.go
Expand Up @@ -10,6 +10,7 @@ import (
type Ag struct {
programID int
airtime int
storage string
}

// ProgramID is method to fill recorder.Recorder interface.
Expand All @@ -22,16 +23,22 @@ func (a *Ag) Airtime() int {
return a.airtime
}

// Storage is method to fill recorder.Recorder interface.
func (a *Ag) Storage() string {
return a.storage
}

// RecordCommand is method to fill recorder.Recorder interface.
// It returns rtmpdump command to record during airtime.
func (a *Ag) RecordCommand(outputPath string) string {
return "rtmpdump -q -r rtmp://fms-base2.mitene.ad.jp/agqr/aandg2 --live --stop " + strconv.Itoa(a.airtime) + " -o " + outputPath + ".flv"
}

// Start : record ag program
func (a *Ag) Start(programID int, airtime int) error {
func (a *Ag) Start(programID int, airtime int, storage string) error {
ag := &Ag{}
ag.programID = programID
ag.airtime = airtime
ag.storage = storage
return recorder.Record(ag)
}
9 changes: 8 additions & 1 deletion internal/recorder/radiko/radiko.go
Expand Up @@ -27,6 +27,7 @@ var (
type Radiko struct {
programID int
airtime int
storage string
}

// ProgramID is method to fill recorder.Recorder interface.
Expand All @@ -39,6 +40,11 @@ func (r *Radiko) Airtime() int {
return r.airtime
}

// Storage is method to fill recorder.Recorder interface.
func (r *Radiko) Storage() string {
return r.storage
}

// RecordCommand is method to fill recorder.Recorder interface.
// It returns rtmpdump command to record during airtime.
func (r *Radiko) RecordCommand(outputPath string) string {
Expand All @@ -47,10 +53,11 @@ func (r *Radiko) RecordCommand(outputPath string) string {
}

// Start : record ag program
func (r *Radiko) Start(programID int, airtime int) error {
func (r *Radiko) Start(programID int, airtime int, storage string) error {
radiko := &Radiko{}
radiko.programID = programID
radiko.airtime = airtime
radiko.storage = storage
return recorder.Record(radiko)
}

Expand Down
9 changes: 7 additions & 2 deletions internal/recorder/recorder.go
Expand Up @@ -8,13 +8,15 @@ import (

"github.com/YuheiNakasaka/radiorec/internal/db"
"github.com/YuheiNakasaka/radiorec/internal/filemanager"
"github.com/YuheiNakasaka/radiorec/internal/uploader/s3"
"github.com/mattn/go-shellwords"
)

// Recorder is interface to record radio
type Recorder interface {
ProgramID() int
Airtime() int
Storage() string
RecordCommand(string) string
}

Expand Down Expand Up @@ -89,8 +91,11 @@ func Record(r Recorder) error {
fmt.Println("Registering...")
mydb.InsertProgramContent(programID, fileManager.FilePath+".mp4")

// S3にアップロード
// uploader.Upload(outputPath+".mp4", filePath+".mp4")
// upload file to external storage
if r.Storage() == "s3" {
awsS3 := s3.AwsS3{}
err = awsS3.Upload(fileManager.OutputPath+".mp4", fileManager.FilePath+".mp4")
}

wg.Done()
}()
Expand Down
63 changes: 63 additions & 0 deletions internal/uploader/s3/s3.go
@@ -0,0 +1,63 @@
package s3

import (
"fmt"
"log"
"os"

"github.com/YuheiNakasaka/radiorec/config"
"github.com/YuheiNakasaka/radiorec/internal/uploader"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awsutil"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)

// AwsS3 is struct to upload to aws s3, which fill uploader interface.
type AwsS3 struct {
uploader.Uploader
}

// Upload : upload to s3
func (s *AwsS3) Upload(path string, filename string) error {
fmt.Println("Uploading...")

// Read config file
myconf := config.Config{}
err := myconf.Init()
if err != nil {
return fmt.Errorf("Failed to load config %v", err)
}

accessKeyID := fmt.Sprintf("%v", myconf.List.Get("aws.s3.access_key_id"))
secretAccessKey := fmt.Sprintf("%v", myconf.List.Get("aws.s3.secret_access_key"))
region := fmt.Sprintf("%v", myconf.List.Get("aws.s3.region"))
bucketName := fmt.Sprintf("%v", myconf.List.Get("aws.s3.bucket"))

file, err := os.Open(path)
if err != nil {
return fmt.Errorf("Failed to open file: %v", err)
}
defer file.Close()

cli := s3.New(session.New(), &aws.Config{
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
Region: aws.String(region),
})

resp, err := cli.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(filename),
ACL: aws.String("public-read"),
ContentType: aws.String("video/mp4"),
Body: file,
})
if err != nil {
return fmt.Errorf("Failed to upload: %v", err)
}

log.Println(awsutil.StringValue(resp))

return err
}
6 changes: 6 additions & 0 deletions internal/uploader/uploader.go
@@ -0,0 +1,6 @@
package uploader

// Uploader is interface of uploading tasks
type Uploader interface {
Upload(string, string) error
}

0 comments on commit 646a6c3

Please sign in to comment.