Skip to content

Spark Core Module

Devender Yadav edited this page Mar 22, 2016 · 11 revisions

This module includes support for HDFS and FS(CSV & JSON). User can read/write data from/to a file at HDFS or in the file system as CSV or JSON. SQL queries can also be performed over data.

Support

To use it, user needs to add the following dependency in pom.xml.

<dependency>
     <groupId>com.impetus.kundera.client</groupId>
     <artifactId>kundera-spark</artifactId>
     <version>${kundera.version}</version>
</dependency>

Persistence unit configuration

 <persistence-unit name="spark_hdfs_pu">
   <provider>com.impetus.kundera.KunderaPersistence</provider>
   <properties>
      <property name="kundera.nodes" value="localhost" />
      <property name="kundera.port" value="7077" />
      <property name="kundera.keyspace" value="sparktest" />
      <property name="kundera.dialect" value="spark" />
      <property name="kundera.client" value="hdfs" />
      <property name="kundera.client.lookup.class" value="com.impetus.spark.client.SparkClientFactory" />
      <property name="kundera.client.property" value="KunderaSparkTest.xml" />
   </properties>
</persistence-unit>

Spark Related Properties

Spark Related Properties are added using xml file. For example in above persistence.xml we mentioned KunderaSparkTest.xml.

Sample Property File:

<?xml version="1.0" encoding="UTF-8"?>
<clientProperties>
   <datastores>
      <dataStore>
         <name>hdfs</name>
         <connection>
            <properties>
               <property name="spark.master" value="local" />
               <property name="spark.app.name" value="sparkhdfs" />
               <property name="spark.executor.memory" value="1g" />
               <property name="spark.driver.allowMultipleContexts" value="true" />
            </properties>
         </connection>
      </dataStore>
   </datastores>
</clientProperties>

Here "spark.master" and "spark.app.name" properties are mandatory. User can add more [spark related properties] (http://spark.apache.org/docs/latest/configuration.html#available-properties) as per their need.

Entity

@Entity
@Table(name = "spark_person")
public class Person implements Serializable
{

    /** The Constant serialVersionUID. */
    private static final long serialVersionUID = 1L;

    /** The person id. */
    @Id
    private String personId;

    /** The person name. */
    private String personName;

    /** The age. */
    private int age;

    /** The salary. */
    private Double salary;

   // setters and getters. 
}

Basic Configuration

User need to set the path & format of data for FS/HDFS file where he/she wants to save, read or write data. User will set these parameters using entity manager level properties as shown below:

For HDFS:

    em.setProperty("kundera.hdfs.inputfile.path", "hdfs://localhost:9000/sparkInputTest/input");
    em.setProperty("kundera.hdfs.outputfile.path", "hdfs://localhost:9000/sparkOutputTest/output");

For FS:

    em.setProperty("kundera.fs.inputfile.path", "src/test/resources/csv_input/")
    em.setProperty("kundera.fs.outputfile.path", "src/test/resources/csv_output/")

Format:

    em.setProperty("format", "json");

Note: currently CSV & JSON formats are supported.

Read-Write Operation

    EntityManagerFactory emf = Persistence.createEntityManagerFactory("spark_hdfs_pu");
    EntityManager em = emf.createEntityManager();
    Person person = new Person();
    person.setAge(23);
    person.setPersonId("1");
    person.setPersonName("Dev");
    person.setSalary(100000.0);

    // save data 
    em.persist(person);
    
    em.clear();

   Person peronFound = em.find(Person.class, "1"); 

   em.close();
   emf.close();

Query Operation

Select all :

String query = "select * from spark_person"; 
List results = em.createNativeQuery(query).getResultList();

Select with WHERE :

String query = "select * from spark_person where salary > 35000";
List results = em.createNativeQuery(query).getResultList();

Select with LIKE :

String query = "select * from spark_person where personName like 'De%'";
List results = em.createNativeQuery(query).getResultList();

Sum (Aggregation) :

String query = "select sum(salary) from spark_person";
List results = em.createNativeQuery(query).getResultList();

Saving data after Querying

User can save the results of the query in HDFS/FS or Cassandra as CSV or JSON.

General Format:

INSERT INTO <source>.<path-to-table/file> [AS <file-type>] FROM <sql-query>

  • source: If user wants to save in file system, its value can be FS or HDFS and if wants to save in database, its value is cassandra (currently cassandra is supported)

  • path-to-table/file: For FS/HDFS, its path to directory and for database, its dbname.tablename

  • file-type: It is required only for FS/HDFS. It's value can be CSV or JSON

  • sql-query: Result of this SQL Query is saved according the above mentioned other parameters

Examples:

To save in FS as CSV:

    String query = "INSERT INTO fs.[src/test/resources/testspark_csv] AS CSV FROM (select * from spark_person)";
    Query q = em.createNativeQuery(query, Person.class);
    q.executeUpdate();

To save in FS as JSON:

    query = "INSERT INTO fs.[src/test/resources/testspark_json] AS JSON FROM (select * from spark_person)";
    q = em.createNativeQuery(query, Person.class);
    q.executeUpdate();

For more details find this testcase.

Limitations

  • SQL queries are supported not JPA queries (like in other modules of Kundera).
  • Currently supported Databases are MongoDB & Cassandra.
  • Currently CSV & JSON fomats are supported for FS & HDFS.
  • Query output can be saved in the CSV & JSON formats for FS & HDFS. If the user wants to save it in Cassandra, then dependency for kundera-spark-cassandra is needed and can be found here. It can't be saved in MongoDB.
Clone this wiki locally