Skip to content
This repository has been archived by the owner on May 27, 2020. It is now read-only.

Commit

Permalink
Initial version
Browse files Browse the repository at this point in the history
  • Loading branch information
robertomorandeira committed Feb 2, 2015
0 parents commit c9a0ccd
Show file tree
Hide file tree
Showing 14 changed files with 724 additions and 0 deletions.
16 changes: 16 additions & 0 deletions .gitignore
@@ -0,0 +1,16 @@
hs_err_pid*.log
nohup.out
.idea
*.iml
**/.idea
*/.classpath
*/.project
*/.settings
*/.cache
*/test-output/
*.log
*/*.versionsBackup
target/
*GitIgnored*
*.asc
*.gpg
278 changes: 278 additions & 0 deletions pom.xml
@@ -0,0 +1,278 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.stratio</groupId>
<artifactId>deep-sql</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>deep-sql</name>
<url>http://github.com/Stratio/deep-sql</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.7.0</java.version>
<scala.version>2.10.4</scala.version>
<scala.test.version>2.2.1</scala.test.version>
<scala.binary.version>2.10</scala.binary.version>
<scala.mock.version>3.2.1</scala.mock.version>
<spark.version>1.2.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>${scala.test.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-hadoop-core</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>

<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.7.1.201405082137</version>
<executions>
<execution>
<id>unit</id>
<goals>
<goal>prepare-agent</goal>
</goals>
<configuration>
<destFile>${project.build.directory}/jacocoUT.exec</destFile>
<propertyName>jacocoUT-argline</propertyName>
</configuration>
</execution>
<execution>
<id>integration</id>
<goals>
<goal>prepare-agent</goal>
</goals>
<configuration>
<destFile>${project.build.directory}/jacocoIT.exec</destFile>
<propertyName>jacocoIT-argline</propertyName>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
</plugin>


<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>cobertura-maven-plugin</artifactId>
<version>2.6</version>
<configuration>
<aggregate>true</aggregate>
<formats>
<format>xml</format>
</formats>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18</version>
<configuration>
<includes>
<include>**/*Test.java</include>
</includes>
<argLine>${jacocoUT-argline}</argLine>
<argLine>-noverify</argLine>
<testFailureIgnore>true</testFailureIgnore>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.18</version>
<configuration>
<includes>
<include>**/*FT.java</include>
</includes>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<includes>
<include>**/*FT.class</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</pluginManagement>


<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/alternateLocation
</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<excludeArtifactIds>::*</excludeArtifactIds>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>

</build>

</project>
40 changes: 40 additions & 0 deletions src/main/scala/com/stratio/deep/mongodb/DefaultSource.scala
@@ -0,0 +1,40 @@
package com.stratio.deep.mongodb

import com.mongodb.{BasicDBList, BasicDBObject}
import com.stratio.deep.mongodb.rdd.MongodbRowRDD
import com.stratio.deep.mongodb.schema.MongodbSchema
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, TableScan}
import org.apache.spark.sql.{Row, SQLContext, StructType}

/**
* Created by rmorandeira on 29/01/15.
*/

class DefaultSource extends RelationProvider {
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
val host = parameters.getOrElse("mongodb.host", sys.error("Option 'mongodb.host' not specified"))
val database = parameters.getOrElse("mongodb.database", sys.error("Option 'mongodb.database' not specified"))
val collection = parameters.getOrElse("mongodb.collection", sys.error("Option 'mongodb.collection' not specified"))
val samplingRatio = parameters.get("mongodb.schema.samplingRatio").map(_.toDouble).getOrElse(1.0)

MongodbRelation(host, database, collection, samplingRatio)(sqlContext)
}
}

case class MongodbRelation(host: String, database: String, collection: String, samplingRation: Double = 1.0)
(@transient val sqlContext: SQLContext) extends TableScan {

override def schema: StructType = lazySchema.toStructType()

@transient lazy val lazySchema = {
val list = new BasicDBList()
list.add("one")
MongodbSchema.schema(new BasicDBObject("_id", 1).append("name", "name").append("players", list).append("other",
new BasicDBObject("pepe", 3)))
}

override def buildScan(): RDD[Row] = {
new MongodbRowRDD(sqlContext, lazySchema, host, database, collection)
}
}
26 changes: 26 additions & 0 deletions src/main/scala/com/stratio/deep/mongodb/package.scala
@@ -0,0 +1,26 @@
package com.stratio.deep

import org.apache.spark.sql.{SQLContext, SchemaRDD}

/**
* Created by rmorandeira on 28/01/15.
*/
package object mongodb {
/**
* Adds a method, fromMongodb, to SQLContext that allows reading data stored in Mongodb.
*/
implicit class MongodbContext(sqlContext: SQLContext) {
def fromMongoDB(host: String, database: String, collection: String, samplingRation: Double = 1.0): SchemaRDD = {
sqlContext.baseRelationToSchemaRDD(MongodbRelation(host, database, collection, samplingRation)(sqlContext))
}
}

/**
* Adds a method, fromMongodb, to schemaRDD that allows storing data in Mongodb.
*/
// TODO:
implicit class MongodbSchemaRDD(schemaRDD: SchemaRDD) {
def saveToMongodb(parameters: Map[String, String]): Unit = ???
}

}
12 changes: 12 additions & 0 deletions src/main/scala/com/stratio/deep/mongodb/rdd/MongodbPartition.scala
@@ -0,0 +1,12 @@
package com.stratio.deep.mongodb.rdd

import org.apache.spark.Partition

/**
* Created by rmorandeira on 29/01/15.
*/
class MongodbPartition(rddId: Int, idx: Int)
extends Partition {
override def hashCode(): Int = 41 * (41 * (41 + rddId) + idx)
override val index: Int = idx
}

0 comments on commit c9a0ccd

Please sign in to comment.