In [1]:
# Configurations related to Cassandra connector & Cluster
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.1 --conf spark.cassandra.connection.host=192.168.1.1,192.168.1.2,192.168.1.3,192.168.1.4,192.168.1.5,192.168.1.6 pyspark-shell '

# Creating PySpark Context
from pyspark import SparkContext
sc = SparkContext("local[*]", "blast app")

# Creating PySpark SQL Context
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import udf
from pyspark.sql.functions import min

#ReferenceFilename = '../Datasets/References/GRCh38_latest_genomic_10K.fna'
ReferenceFilename = '../Datasets/References/Example.txt'
ReferenceName = "example"
ReferenceHashTableName = "hash"
ReferenceContentTableName = "sequences"
contblocksize = 100

if (True):
    # Create Cassandra reference table
    CreateCassandraReferenceTables(ReferenceName)

    # Read Reference file (offset,line) from hdfs
    reference_rdd = sc.newAPIHadoopFile(
        ReferenceFilename,
        'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
        'org.apache.hadoop.io.LongWritable',
        'org.apache.hadoop.io.Text',
    )

    # Create DataFrame  
    reference_df = sqlContext.createDataFrame(reference_rdd,["file_offset","line"])
    
    # Delete first line header if exist
    header = reference_df.first()
    header_size = 0
    if (header.line[0]=='>'):
        header_size = len(header.line)+1
        df = reference_df.filter(reference_df.file_offset!=0)
    else:
        df = reference_df

    if (DDebug):
        print("CreateReferenceContent::Show original dataframe:".format(df.show()))

    # Calculate BlockId
    df1 = df.withColumn("blockid", (df.file_offset / contblocksize).cast("int"))
    if (DDebug):
        print("CreateReferenceContent::Show distincs groups:".format(df1.show()))
    
    # Concatenate lines to create blocks
    my_window = Window.partitionBy("blockid").orderBy("offset")
    grouped_df1 = df1.groupby('blockid').agg(collect_list("line").alias("value")).drop('line')
    if (DDebug):
        print("CreateReferenceContent::Agrupped 1 dataframe:".format(grouped_df1.show(10)))   

    grouped_df2 = df1.groupby('blockid').agg(min("file_offset").alias("offset"))
    if (DDebug):
        print("CreateReferenceContent::Agrupped 2 dataframe:".format(grouped_df2.show(10)))    
        
    joined_df = grouped_df1.join(grouped_df2, grouped_df1.blockid==grouped_df2.blockid).drop(grouped_df2.blockid)
    if (DDebug):
        print("CreateReferenceContent::Joined dataframe:".format(joined_df.show(10)))   

    concat_list = udf(lambda lst: "".join(lst), StringType())
    cass_reference_df = joined_df.withColumn("value", concat_list(grouped_df1.value)) \
                          .withColumn("size", F.length("value")) \
                          .sort(col("offset").asc())
    if (DDebug):
        print("CreateReferenceContent::Agrupped 2 dataframe:".format(cass_reference_df.show(10)))    
        
    # Write Reference Dataframe to cassandra.   
    cass_reference_df.write.format("org.apache.spark.sql.cassandra").mode('append').options(table=DReferenceContentTableName, keyspace=ReferenceName).save()


In [None]:
from pyspark.sql.functions import col

if (True):
    grouped_df = joined_df.withColumn("value", concat_list(grouped_df1.value)) \
                          .withColumn("size", F.length("value"))  \
                          .sort(col("offset").asc())
    if (DDebug):
        print("CreateReferenceContent::Agrupped 2 dataframe:".format(grouped_df.show(10)))   

In [14]:
# PySpark program to create blast reference hash on cassandra using spark
# Usage: SparkBlast_CreateReference <Reference_Files> [Key_size=11] [ReferenceName=blast] [Method=1] [PartitionSize=0] [ContentBlockSize=1000]


import pyspark
from pyspark import SparkContext
from pyspark.conf import SparkConf 
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import Row, StringType, StructType, _infer_schema, _has_nulltype, _merge_type, _create_converter
from pyspark.sql.functions import udf, upper, desc, collect_list, udf, min, col, row_number
from pyspark.sql.types import StringType
from cassandra.cluster import Cluster
from enum import Enum
from time import time
import os, shutil, sys, subprocess, math
from subprocess import PIPE, Popen
import random
from datetime import datetime


#import SparkBlast_CreateReferenceContent


## Constants
DDoTesting = True
DDebug = False
DTiming = True
APP_NAME = "SparkBlast_CreateHash_"
DKeySize = 11
DReferenceFilename = '../Datasets/References/GRCh38_latest_genomic_10K.fna'
#DReferenceFilename = '../Datasets/References/Example.txt'
DOutputFilename = '../Output/GRCh38_latest_genomic_10K'
#DOutputFilename = "hdfs://babel.udl.cat:8020/user/nando/output/references/GRCh38_latest_genomic/"
DReferenceName = "blast"
DReferenceHashTableName = "hash"
DReferenceContentTableName = "sequences"
DCreateWindowWithPartitions = True
DCreateBlocksDataFrame = True
DCreateWindowWithPartitions = False
DNumberPartitions = 1
DMaxNumberStages = 3
DMin_Lines_Partition = 1000
DPartitionBlockSize = 128  * 1024 
DContentBlockSize = 1000
DHashGroupsSize = 120000000

## Types
# Enum Methods:
ECreate2LinesData = 1
ECreate1LineDataWithoutDependencies = 2
ECreateBlocksData = 3
DDefaultMethod = ECreate2LinesData
   
## Content Creationt Methods
EContentCreationSpark = 1
EContentCreationPython = 2
DDefaultContentCreationMethod = EContentCreationPython
    
## Global Variables
Method = DDefaultMethod
CreateWindowWithPartitions = DCreateWindowWithPartitions
BlockSize = DPartitionBlockSize
ContentBlockSize = DContentBlockSize
ContentCreationSpark = DDefaultContentCreationMethod
HashGroupsSize = DHashGroupsSize
StatisticsFileName = None
key_size_bc = 0
hash_groups_size_bc = 0
gt0_bc = time()


## Functions ##

def CreateReference(sc, sqlContext, KeySize, ReferenceFilename, ReferenceName, HashGroupsSize):

    # Broadcast Global variables
    global key_size_bc, gt0_bc, hash_groups_size_bc
    t0 = time()
    gt0_bc = sc.broadcast(t0)
    key_size_bc = sc.broadcast(KeySize)
    hash_groups_size_bc = sc.broadcast(HashGroupsSize)
    
    if (DTiming):
        now = datetime.now()
        date_time = now.strftime("%m/%d/%Y %H:%M:%S")
        executors = sc._conf.get("spark.executor.instances")
        cores = sc.getConf().get("spark.executor.cores")
        memory = sc.getConf().get("spark.executor.memory")
        print("++++++++++++ INITIAL STATISTICS {} +++++++++++++".format(date_time))
        print("+ Reference: {}  \tFile: {}.".format(ReferenceName, ReferenceFilename))
        print("+ Key Size: {}   \tMethod: {}.".format(KeySize, Method))
        print("+ Num Executors: {}  \tExecutors/cores: {}  \tExecutor Mem: {}.".format(executors, cores, memory))
        print("+ Hash Groups Size: {}  \tPartition Size: {}  \tContent Block Size: {}.".format(HashGroupsSize, BlockSize, ContentBlockSize))
        print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")

    # Create Cassandra reference table
    CreateCassandraReferenceTables(ReferenceName)

    # Read Reference file (offset,line) from hdfs
    reference_rdd = sc.newAPIHadoopFile(
        ReferenceFilename,
        'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
        'org.apache.hadoop.io.LongWritable',
        'org.apache.hadoop.io.Text',
    )
    if (DDebug):
        print("Reference:: Input file has {} lines and {} partitions: ".format(reference_rdd.count(),reference_rdd.getNumPartitions()))
        print("Reference::First 10 records: "+format(reference_rdd.take(10)))
        #reference_rdd.count()

    # Create de Reference Content Table
    # Problem: The reference content table has to be created using a standalone python app using linux file.
    #dfc = CreateReferenceContent(reference_rdd, sqlContext, ReferenceFilename, ContentBlockSize, ReferenceName)
   
    t1a = time()
    
    # Create de Reference Index dataframe
    df = CreateDataFrame(reference_rdd)

    # Calculate keys
    t1 = time()
    #df.where((df.offset>16400000)).show(10000)
    #df.show(100000)
    result = df.rdd.flatMap(generateReferenceKeys)
    print(result.count())
    print(result.take(1000))

    result = result.groupByKey().map(lambda r: (Row(seq=r[0][0],block=r[0][1],value=list(r[1]))))   
    t2 = time()
        
    if (False):
        result = df.rdd.flatMap(generateReferenceKeys)
        result_bykey = result.groupByKey()
        #result_bykey = result_bykey.filter(
        #result_bykey.persist()
        #result_count = result_bykey.map(lambda r: (r[0],len(r[1])))
        #print("-----------> Maximun number of offsets: {}".format(result_count.max(key=lambda x:x[1])))
        result = result_bykey.map(lambda r: (Row(seq=r[0][0],block=r[0][1],value=list(r[1]))))   

        if (False):
            result.persist()
            print(result.take(10))        
            print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
            print("Spark version: {}.".format(sc.version))
            print("Reference:: Result has {} keys and {} partitions: ".format(result.count(),result.getNumPartitions()))
            new_partitions = int(result.count()/100.0)
            if (new_partitions > 200):
                print("New Partitions: {}.".format(new_partitions))
                result_df= result.toDF().repartition(new_partitions)
            else:
                result_df= result.toDF()
            print("Reference:: Repartitioned Result has {} keys and {} partitions:".format(result_df.count(),result_df.rdd.getNumPartitions()))  
            print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")


    # Calculate Reference keys and write to cassandra   
    #result.toDF().write.format("org.apache.spark.sql.cassandra").mode('append').options(table=DReferenceHashTableName, keyspace=ReferenceName).save()
    result.toDF().write.format("org.apache.spark.sql.cassandra").mode('overwrite').options(table=DReferenceHashTableName, keyspace=ReferenceName).option("confirm.truncate","true").save()   
    tc = time()

    if (DTiming):
        executors = sc._conf.get("spark.executor.instances")
        cores = sc.getConf().get("spark.executor.cores")
        memory = sc.getConf().get("spark.executor.memory")
        print("########################### FINAL STATISTICS {} ###########################".format(date_time))
        print("# Reference: {}  \tFile: {}.".format(ReferenceName, ReferenceFilename))
        print("# Key Size: {}            \tMethod: {}.".format(KeySize, Method))
        print("# Num Executors: {}      \tExecutors/cores: {}      \tExecutor Mem: {}.".format(executors, cores, memory))
        print("# Hash Groups Size: {}  \tPartition Size: {}  \tContent Block Size: {}.".format(HashGroupsSize, BlockSize, ContentBlockSize))
        print("# Total Time: {}         \tData Read Time: {}   \tData Frame Time: {}.".format(round(tc-t0,3), round(t1a-t0,3), round(t1-t1a,3)))
        print("# Key Calc Time: {}   \tCassandra Write Time: {}.".format(round(t2-t1,3), round(tc-t2,3)))
        print("############################################################################################")
        #result.persist()
        #print("# Total time required for processing {} keys using {} partitions in {} seconds.".format(result.count(), result.getNumPartitions(), round(tt,3)))
        #print("# Reference data size: {} MBytes.\n".format(round(get_size(ReferenceFilename)/(1024.0*1024.0),3)))
        if (StatisticsFileName):
            write_statistics(StatisticsFileName, ReferenceFilename, ReferenceName, KeySize, date_time,result.count(), result.getNumPartitions() , tc-t0, t1a-t0, t1-t1a, t2-t1, tc-t2)

    print("Done.")

    
    return

#
# Create the sequences and hash tables for the Reference
#
def CreateCassandraReferenceTables(ReferenceName):
    #cluster = Cluster(['babel.udl.cat', 'client1.babel', 'client2.babel', 'client3.babel', 'client4.babel',' client5.babel'])
    #cluster = Cluster(['192.168.1.1', '192.168.1.2', '192.168.1.3', '192.168.1.4', '192.168.1.5', '192.168.1.6'])
    cluster = Cluster(['192.168.1.1'])
    session = cluster.connect()
    session.execute("CREATE KEYSPACE IF NOT EXISTS "+ ReferenceName +" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };")
    session.execute("DROP TABLE IF EXISTS "+ ReferenceName +"."+ DReferenceHashTableName +";")
    session.execute("CREATE TABLE "+ ReferenceName + "."+ DReferenceHashTableName +" (seq text, block int, value list<bigint>, PRIMARY KEY(seq,block));")
    #session.execute("DROP TABLE IF EXISTS "+ ReferenceName +"."+ DReferenceContentTableName +";")
    #session.execute("CREATE TABLE "+ ReferenceName + "."+ DReferenceContentTableName +" (blockid bigint, offset bigint, size int, value text, PRIMARY KEY(blockID));")
    session.shutdown()
    cluster.shutdown()
       
    if (DDebug):
        print("Cassandra Database Created.")
        
    
#
# Create a table to store the reference content on Cassandra, indexed by the block.    
#
def CreateReferenceContent(reference_rdd, sqlContext, referenceFilename, contblocksize, referenceName): 

    t1 = time()
    
    if (ContentCreationSpark==EContentCreationSpark):
        df = CreateReferenceContentSpark(reference_rdd, sqlContext, contblocksize, referenceName)
    elif (ContentCreationSpark==EContentCreationPython):
        df = CreateReferenceContentPython(sqlContext, referenceFilename, contblocksize, referenceName)
    else:
        print("CreateReferenceContent unknown content creation method {}."+format(ContentCreationSpark))
    
    if (DTiming):
        t2 = time()
        print("Time required for create reference dataframe, with block size {} in {} seconds.\n".format(contblocksize, round(t2 - t1,3)))

    
#
# Create a table to store the reference content on Cassandra, indexed by the block.    
#
def CreateReferenceContentSpark(reference_rdd, sqlContext, contblocksize, referenceName):  
    print("Method: CreateReferenceContent")
    
    t1 = time()
       
    # Create DataFrame  
    reference_df = sqlContext.createDataFrame(reference_rdd,["file_offset","line"])
    
    # Delete first line header if exist
    header = reference_df.first()
    header_size = 0
    if (header.line[0]=='>'):
        header_size = len(header.line)+1
        df = reference_df.filter(reference_df.file_offset!=0)
    else:
        df = reference_df

    if (DDebug & False):
        print("CreateReferenceContent::Show original dataframe:".format(df.show()))

    # Calculate BlockId
    df1 = df.withColumn("blockid", (df.file_offset / contblocksize).cast("int"))
    if (DDebug& False):
        print("CreateReferenceContent::Show distincs groups:".format(df1.show()))
    
    # Concatenate lines to create blocks
    my_window = Window.partitionBy("blockid").orderBy("offset")
    grouped_df1 = df1.groupby('blockid').agg(collect_list("line").alias("value")).drop('line')
    if (DDebug & False):
        print("CreateReferenceContent::Agrupped 1 dataframe:".format(grouped_df1.show(10)))   

    grouped_df2 = df1.groupby('blockid').agg(min("file_offset").alias("offset"))
    if (DDebug & False):
        print("CreateReferenceContent::Agrupped 2 dataframe:".format(grouped_df2.show(10)))    
        
    joined_df = grouped_df1.join(grouped_df2, grouped_df1.blockid==grouped_df2.blockid).drop(grouped_df2.blockid)
    if (DDebug & False):
        print("CreateReferenceContent::Joined dataframe:".format(joined_df.show(10)))   

    concat_list = udf(lambda lst: "".join(lst), StringType())
    cass_reference_df = joined_df.withColumn("value", concat_list(grouped_df1.value)) \
                          .withColumn("size", F.length("value")) \
                          .sort(col("offset").asc())
        
    if (DDebug):
        print("CreateReferenceContent::Agrupped 2 dataframe:".format(cass_reference_df.show(10)))    
        
    t2 = time()
    
    # Write Reference Dataframe to cassandra.   
    cass_reference_df.write.format("org.apache.spark.sql.cassandra").mode('append').options(table=DReferenceContentTableName, keyspace=referenceName).save()

    if (DTiming):
        print("Time required for write cassandra reference table with block size {} in {} seconds.\n".format(contblocksize, round(time() - t2,3)))
        tt = time() - t1
        print("Total time to create reference table: {} seconds.".format(round(tt,3)))
    
    return cass_reference_df 


#
# Create a table to store the reference content on Cassandra, indexed by the block.    
#
def CreateReferenceContentPython(sqlContext, referenceFilename, contblocksize, referenceName):  
    
    if (DDebug):
        print("CreateReferenceContentPython")
    
    # Create Cassandra Table
    obj = SparkBlast_CreateReferenceContent. SparkBlast_CreateReferenceContent(referenceFilename, referenceName, contblocksize)
    obj.CreateReferencoContentTable()
    
    # Create and load RDD from Cassandra Table.
    cass_reference_df = sqlContext.read.format("org.apache.spark.sql.cassandra").\
                                  load(keyspace=referenceName, table=DReferenceContentTableName)
    if (DDebug):    
        print(cass_reference_df)
        cass_reference_df.show(10)
    
    return cass_reference_df

    
#
# Crates dataframe (offset,size,lines) from imput file rdd.
#       
def CreateDataFrame(reference_rdd):

    # Create DataFrame  
    reference_df = sqlContext.createDataFrame(reference_rdd,["file_offset","line"])
    #reference_df = sqlc.createDataFrame(reference_rdd.take(5000),["file_offset","line"])

    # Delete first line header if exist
    header = reference_df.first()
    header_size = 0
    if (header.line[0]=='>'):
        header_size = len(header.line)+1
        df1 = reference_df.filter(reference_df.file_offset!=0)
    else:
        df1 = reference_df
           
    if (Method==ECreate2LinesData):
        df = Create2LinesDataFrame(df1, header_size, BlockSize)
    elif (Method==ECreate1LineDataWithoutDependencies):
        df = Create1LineDataFrameWithoutDependencies(df1, header_size)
    elif (Method==ECreateBlocksData):
        df = CreateBlocksDataFrame(df1, header_size, BlockSize)

    return df
    
    
def Create2LinesDataFrame(df, header_size, blocksize):
    
    t1 = time()
    
    if (False and CreateWindowWithPartitions and blocksize>0):
        df = df.withColumn("block", (df.file_offset / blocksize).cast("int"))
        my_window = Window.partitionBy("block").orderBy("file_offset")
        print("Spark shuffle partitions:".format(sqlContext.getConf("spark.sql.shuffle.partitions")))
    else: # Without partitions
        my_window = Window.partitionBy().orderBy("file_offset")
          
    df1 = df.withColumn("line", upper(df.line)) \
           .withColumn("id",row_number().over(my_window))  
    df2 = df1.withColumn("next_line", F.lag(df1.line,-1).over(my_window))   
    df3 = df2.withColumn("size", F.length(df2.line)) \
             .withColumn("lines", F.when(F.isnull(df2.next_line), df2.line) \
                                   .otherwise(F.concat(df2.line, df2.next_line))) \
             .withColumn("offset", (df2.file_offset-header_size)-(df2.id-1)) \
             .drop(df2.line).drop(df2.next_line).drop(df2.file_offset).drop(df2.id)
    
    # Calculate Number of Partitions    
    executors = sc._conf.get("spark.executor.instances")
    cores = sc.getConf().get("spark.executor.cores")
    if executors is None or cores is None:
        total_cores = 1
    else:
        total_cores = int(executors) * int(cores)
    max_partitions = DMaxNumberStages * total_cores
    lines = df3.count()
    lines_part = lines/max_partitions
    if (lines_part>DMin_Lines_Partition):
        NumberPartitions = max_partitions
    else:
        NumberPartitions = lines/DMin_Lines_Partition
    
    print("#### Number of partitions:".format(NumberPartitions))
    df3 = df3.repartition(NumberPartitions)
    S    
    if (DTiming):
        #print("Time required for read and prepare dataframe with {} rows using {} partitions in {} seconds.\n".format(df3.count(), df3.rdd.getNumPartitions(), round(time() - gt0_bc.value,3)))
        print("Time required for read and prepare dataframe in {} seconds.\n".format(round(time() - t1,3)))

    return df3


def Create1LineDataFrameWithoutDependencies(df, header_size):
    # Create Blocks of lines to avoid dependencies with the previous line.
    # Ref: https://stackoverflow.com/questions/49468362/combine-text-from-multiple-rows-in-pyspark
  
    my_window = Window.partitionBy().orderBy("file_offset")
    df1 = df.withColumn("lines", upper(df.line)) \
           .withColumn("id",row_number().over(my_window))
    df3 = df.withColumn("size", F.length(df.lines)-DKeySize) \
             .withColumn("offset", (df.file_offset-header_size)-(df2.id-1)) \
             .drop(df.line).drop(df.file_offset).drop(df.id)    
    #df3.persist()
    #df3.rdd.getNumPartitions() 
    #print("Number of total rows: {} with {} partitions".format(df3.count(),df3.rdd.glom().count()))
    #print("DF3:")
    #df3.show(20) 
    #val rdd = sc.cassandraTable("test", "words")
    #print("Time required for read and prepare dataframe with {} rows using {} partitions in {} seconds.\n".format(df3.count(), df3.rdd.getNumPartitions(), round(time() - gt0_bc.value,3)))
    
    if (DTiming):
        print("Time required for read and prepare dataframe in {} seconds.\n".format(round(time() - gt0_bc.value,3)))

    return df3
      
    
def CreateBlocksDataFrame(dfc, header_size, blocksize):
    if (DDebug):
        print("Method: ECreateBlocksData")
        dfc.show(10)

    #return CreateBlocksDataFrame2(dfc)   
    t1 = time()
    
    global key_size_bc
    keySize = key_size_bc.value
    
    if (CreateWindowWithPartitions and blocksize>0):
        dfc = dfc.withColumn("block", (dfc.offset / blocksize).cast("int"))
        my_window = Window.partitionBy("block").orderBy("file_offset")
        if (DDebug):
            print("Spark shuffle partitions:".format(sqlContext.getConf("spark.sql.shuffle.partitions")))
    else: # Without partitions
        my_window = Window.partitionBy().orderBy("file_offset")
       
    #dfc.show(20)
    #my_window = Window.partitionBy().orderBy("offset")     
    #df1 = dfc.select(dfc.value.substr(0,keySize).alias("prefix"))
    df0 = dfc.withColumn("lines", upper(dfc.line))
    df1 = df0.withColumn("prefix",df0.lines.substr(0,keySize-1))
    df2 = df1.withColumn("next_line", F.lag(df1.prefix,-1).over(my_window))
    df3 = df2.withColumn("lines", F.when(F.isnull(df2.next_line), df2.lines) \
                                   .otherwise(F.concat(df2.lines, df2.next_line))) \
             .withColumn("offset", df2.file_offset-header_size) 
    df3 = df3.withColumn("size", F.length(df3.lines)) 
    #df3.sort(col("offset").asc()).show(10)
    df3 = df3.drop("prefix").drop("next_line").drop("line").drop(df3.file_offset) 
    #df3.sort(col("offset").asc()).show(10)
      
    if (DTiming):
        print("Time required for read and prepare dataframe in {} seconds.\n".format(round(time() - t1,3)))
    
    return df3


def CreateBlocksDataFrame2(dfc):
    if (DDebug):
        print("Method: ECreateBlocksData2")
 
    t1 = time()
    
    df3 = dfc.withColumn("lines",dfc.value).drop("value")
    
    if (DDebug):
        print("Dataframe done with {} rows using {} partitions in {} seconds.\n".format(df3.count(), df3.rdd.getNumPartitions()))
    if (DTiming):
        print("Time required for read and prepare dataframe in {} seconds.\n".format(round(time() - t1,3)))
    
    return df3
 


def AddCassandraKey(key, offset):
    listText = str(offset)
    print(key, listText)
    cluster = Cluster(['192.168.1.1', '192.168.1.2', '192.168.1.3', '192.168.1.4', '192.168.1.5', '192.168.1.6'])
    session = cluster.connect()
    session.execute("update blast.sequences set value = value + [%s] where seq =%s;", (listText,key))
    #rddUpdateSeq = sc.parallelize([Row(seq=key,value=listText)])
    #rddUpdateSeq.toDF().write\
    #.format("org.apache.spark.sql.cassandra")\
   # .mode('append')\
   # .options(table="sequences", keyspace="blast")\
   # .save()
    session.shutdown()
    cluster.shutdown()
    return


def generateReferenceKeys(record):
    return generateReferenceKeysR(record.offset, record.size, record.lines, len(record.lines))
    
    
# Generate reference keys with the following tuples {key,offset}
# Does not store the key extension 
def generateReferenceKeysR(offset, size, lines, lines_size):
 
    global key_size_bc, hash_groups_size_bc
    KeySize  = key_size_bc.value
    HashGroupsSize = hash_groups_size_bc.value
    Keys = [] 

    #tg1 = time()
    # Calculate the first and last keys desplazaments.
    first_key = 0
    if (size!=lines_size):    
        # Internal lines.
        last_key = size
    else:
        # Last file line.
        last_key = size - KeySize + 1
    
    forbiden_key = "N" * KeySize
   
    for k in range (first_key, last_key):
        # Add key to casandra
        #AddCassandraKey(lines[k:k+Key_size], offset+k)
        
        # Add key to python list
        #Keys.append({'Key':lines[k:k+Key_size],'Offset':offset+k})
        #Keys.append((lines[k:k+Key_size],str(offset+k)))
        #Keys.append({'seq':lines[k:k+KeySize],'value':[offset+k]})
        #Keys.append(Row(seq=lines[k:k+KeySize],value=[offset+k]))
        if (lines[k:k+KeySize]!=forbiden_key):
            if (offset+k)<0:
                print("ERROR::generateReferenceKeysR ({}, {}, {}, {})->{}".format(offset, size, lines, lines_size, offset+k))
                error
            if (lines[k:k+KeySize]=='GAATGCTGTGT'):
                print("##### DEBUG::generateReferenceKeysR ({}, {}, {}, {})->{}".format(offset, size, lines, lines_size, offset+k))
            #    error
            #if ((offset+k)>160000000):
            #    print("##### DEBUG::generateReferenceKeysR ({}, {}, {}, {})->{}".format(offset, size, lines, lines_size, offset+k))
            
            Keys.append(((lines[k:k+KeySize],(offset+k)/HashGroupsSize),offset+k))
        
    #print("\rProcessing {} keys from offset {} in {} secs".format(len(Keys),offset, round(time() - gt0_bc.value,3), end =" "))
    
    return Keys


def write_statistics(statisticsFileName, referenceFilename, referenceName, keySize, date, keysNumber, partitions, totalTime, readTime, dfTime, keyTime, casTime):

    DStatisticsFileHeader = "Procedure; Reference File Name ; Reference Name ; Date  ; Number of executors ; Cores/Executor ; Memory/Executor ; Total Time (sec) ; Data Read Time (sec) ; Data Frame Time (sec) ; Key Calc Time (sec) ; Cassandra Write Time (sec) ; Key Size; Number of Keys ; Number of partitions ; Method ; Hash Group Size ; Hash Groups Size ; Block Size ; Content Block Size ; "
    DHdfsHomePath = "hdfs://babel.udl.cat/user/nando/"
    DHdfsTmpPath = DHdfsHomePath + "Tmp/"

    executors = sc._conf.get("spark.executor.instances")
    cores = sc.getConf().get("spark.executor.cores")
    memory = sc.getConf().get("spark.executor.memory")
    
    new_stats = "SparkBlast Create Hash ; %s ; %s ; %s ; %s ; %s ; %s ; %f ; %f ; %f ; %f ; %f ; %d ; %d ; %d ; %d ; %d ; %d ; %d ; " % (referenceFilename, referenceName, date, executors, cores, memory, totalTime, readTime, dfTime, keyTime, casTime, keySize, keysNumber, partitions, Method, HashGroupsSize, BlockSize, ContentBlockSize)

        
    if (False):
        # Generate statistics in linux file.
        if os.path.isfile(statisticsFileName):
            statisticsFile = open(statisticsFileName,"a")
        else:
            statisticsFile = open(statisticsFileName,"w")
            statisticsFile.write(DStatisticsFileHeader)    

        #new_stats = "SparkBlast Create Hash ; " + referenceFilename + " ; " + referenceName + " ; " + str(totalTime) + " ; " + str(readTime) + " ; "+ str(dfTime) + " ; " + str(keyTime) + " ; " + str(casTime) + " ; " + str(keySize) + " ; " + str(keysNumber) + " ; " + str(partitions) + " ; " + str(Method) + " ; " + str(HashGroupsSize) + " ; " + str(BlockSize) + " ; " + str(ContentBlockSize) + " ;")
        statisticsFile.write(new_stats + "\n")
        statisticsFile.close()

        # Copy statistics file into hdfs
        #cmd = ['hdfs', 'dfs', '-put', statisticsFileName, DHdfsResultsPath]
        #run_cmd(cmd)

        print(new_stats)
        executors = sc._conf.get("spark.executor.instances")
        cores = sc.getConf().get("spark.executor.cores")
        memory = sc.getConf().get("spark.executor.memory")
        print("Print number of executors: {} \tExecutor cores: {} \tExecutors memory".format(executors, cores, memory))

    
    print("File {} exists? {}".format(DHdfsHomePath+statisticsFileName, check_file(DHdfsHomePath+statisticsFileName)))
 
    # Generate statistics in hdfs using rdd.
    # Read statstics file
    if (not check_file(DHdfsHomePath+statisticsFileName)):
        new_stats_rdd = sc.parallelize([new_stats],1)
    else:
        new_stats_rdd = sc.parallelize([DStatisticsFileHeader, new_stats],1)
        
    OutputFile = DHdfsTmpPath+"tmp"+str(random.randint(1, 10000))
    remove_file(OutputFile)
    new_stats_rdd.saveAsTextFile(OutputFile)
  
    cmd = ['hdfs', 'dfs', '-getmerge',OutputFile+"/part-*", "/tmp/prueba"]
    if (run_cmd(cmd)):
        print("Error getmerge")
    cmd = ['hdfs', 'dfs', '-appendToFile',"/tmp/prueba", DHdfsHomePath+statisticsFileName]
    if (run_cmd(cmd)):
        print("Error appendToFile")
    
    remove_file(OutputFile)         


def TestCreateReference(sc, KeySize, ReferenceName):
    
    # Broadcast Global variables
    global key_size_bc, gt0_bc
    t0 = time()
    gt0_bc = sc.broadcast(t0)
    key_size_bc = sc.broadcast(KeySize)
    
    # Test input data
    line1='123456789012345678901234567890'
    line2='abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz'
    line3='ZYXWVUTSRQPONMLKJIHGFEDCBAZYXWVUTSRQPONMLKJIHGFEDCBA'
    line4='098765432109876543210987654321'

    # Processing the first line:
    offset = 0
    despl = 0
    size = len(line1)
    lines = line1+line2
    lines_size = len(lines)
    keys1 = generateReferenceKeysR(offset, size, lines, lines_size)
#    keys1_sorted = sorted(keys1, key=lambda d: d['Offset']) 
    print("Keys first line:"+format(keys1))

    # Processing the second line:
    offset = len(line1)
    despl = len(line1)
    size = len(line2)
    lines = line1+line2+line3
    lines_size = len(lines)
    keys2 = generateReferenceKeysR(offset, size, lines, lines_size)
    print("Keys second line:"+format(keys2))

    # Processing the third line:
    offset += len(line2)
    despl = len(line2)
    size = len(line3)
    lines = line2+line3+line4
    lines_size = len(lines)
    keys3 = generateReferenceKeysR(offset, size, lines, lines_size)
    print("Keys third line:"+format(keys3))

    # Processing the last line (line4):
    offset += len(line3)
    despl = len(line3)
    size = len(line4)
    lines = line3+line4
    lines_size = len(lines)
    keys4 = generateReferenceKeysR(offset, size, lines, lines_size)
    print("Keys fourth line:"+format(keys4))
    print("Number of keys:"+format(len(line1)+len(line2)+len(line3)+len(line4)-KeySize+1))




def array_to_string(my_list):
    return '[' + ','.join([str(elem) for elem in my_list]) + ']'


def CreateReferenceToFile(sc, sqlContext, KeySize, ReferenceFilename, OutputFilename):

    print("CreateReferenceToFile: {}, {}, {}.".format(KeySize, ReferenceFilename, OutputFilename))
    
    print round(get_size()/(1024.0*1024.0),3)

    if os.path.exists(OutputFilename): 
        shutil.rmtree(OutputFilename)
        os.system("hdfs dfs -rm -R "+OutputFilename)
        
    # Broadcast Global variables
    global key_size_bc, gt0_bc
    t0 = time()
    gt0_bc = sc.broadcast(t0)
    key_size_bc = sc.broadcast(KeySize)

    # Register UDF function
    #printRecord_udf = udf(printRecord)
    #decOffset_udf = udf(decOffset, LongType())
        

    # Read Reference file (offset,line) from linux
    '''
    reference_rdd = sc.newAPIHadoopFile(
        'Datasets/References/GRCh38_latest_genomic.fna.gz',
        'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
        'org.apache.hadoop.io.LongWritable',
        'org.apache.hadoop.io.Text',
    )
    # Read Reference file (offset,line) from hdfs
    reference_rdd = sc.newAPIHadoopFile(
        '/user/nando/Datasets/References/GRCh38_latest_genomic.fna.gz',
        'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
        'org.apache.hadoop.io.LongWritable',
        'org.apache.hadoop.io.Text',
    )
    '''
    # Read Reference file (offset,line) from hdfs
    reference_rdd = sc.newAPIHadoopFile(
        ReferenceFilename,
        'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
        'org.apache.hadoop.io.LongWritable',
        'org.apache.hadoop.io.Text',
    )

    #reference_rdd.cache()
    print("Reference::Number of partitions: "+format(reference_rdd.getNumPartitions()))
    #print("Reference::First 10 records: "+format(reference_rdd.take(10)))
    #reference_rdd.count()

    reference_df = sqlContext.createDataFrame(reference_rdd,["file_offset","line"])
    #reference_df = sqlContext.createDataFrame(reference_rdd.take(1000),["file_offset","line"])
    header = reference_df.first()
    header_size = len(header.line)+1
    df1 = reference_df.filter(reference_df.file_offset!=0)

    my_window = Window.partitionBy().orderBy("file_offset")
    df1 = df1.withColumn("line", upper(df1.line))
    df2a = df1.withColumn("next_line", F.lag(df1.line,-1).over(my_window))
    df3 = df2a.withColumn("size", F.length(df2a.line)) \
           .withColumn("lines", F.when(F.isnull(df2a.next_line), df2a.line)
                                 .otherwise(F.concat(df2a.line, df2a.next_line))) \
           .withColumn("offset", df2a.file_offset-header_size) \
           .drop(df2a.line).drop(df2a.next_line).drop(df2a.file_offset)
    #df3.persist()
    #print("Number of total rows:"+format(df3.count()))
    #print("DF3:")
    #df3.show(20)

    #tt = time() - gt0_bc.value
    print("Time required for read and prepare dataframe with {} rows using {} partitions in {} seconds.\n".format(df3.count(), df3.rdd.getNumPartitions(), round(time() - gt0_bc.value,3)))
    t1 = time()

    #my_window = Window.partitionBy().orderBy("offset")
    #df2b = reference_df.withColumn("next_offset", F.lag(reference_df.offset,-1).over(my_window))
    #df3b = df2.withColumn("two_lines", F.when(F.isnull(F.concat(df2.prev_line, df2.line)), df2.line)
    #                                   .otherwise(F.concat(df2.prev_line, df2.line)))
    #print("DF2B:")
    #df2b.show()

    #df3 = df3.filter(df3.offset>20000)

    #df4.foreach(printRecord)

    #df5 = df4.select("offset", "two_lines", printRecord3("offset", "two_lines"))
    #print("DF5:")
    #df5.show()

    # Calculate keys
    result = df3.rdd.map(generateReferenceKeys)
    error
    #print("Number of keys generated: "+format(result.count()))
    #tt = time() - t1
    print("Time required for calculate {} keys using {} partitions in {} seconds.\n".format(result.count(), df3.rdd.getNumPartitions(), round(time() - t1,3)))
    t2 = time()

    # Calculate results dataframe
    #df5 = result.flatMap(lambda list: map(lambda key: (key['Offset'], key['Key'], key['Extension']), list )).toDF(("Offset", "Key", "Extension"))
    #df5 = result.flatMap(lambda list: map(lambda key: (key['Offset'], key['Key']), list)).toDF(("Offset", "Key"))
    df5 = result.flatMap(lambda list: map(lambda key: (key['seq'], key['value']), list)).reduceByKey(lambda x,y: x + y).toDF(("Key", "Offset"))
    df5.persist()
    df5.show(10)
    
    # Write results as linux CSV file
    # Flattening Offset array to be writed as csv
    array_to_string_udf = udf(array_to_string,StringType())
    df5 = df5.withColumn('Offset',array_to_string_udf(df5["Offset"])) 
    # Write results as hdfs CSV file
    df5.write.csv(path=OutputFilename, sep="\t")

    tt = time() - t2
    print("Time required for write {} keys using {} partitions in {} seconds.\n".format(df5.count(), df3.rdd.getNumPartitions(), round(tt,3)))
    #df3.unpersist()
    #df5.unpersist()

    tt = time() - gt0_bc.value
    print("Total time required for processing {} keys using {} partitions in {} seconds.".format(df5.count(), df3.rdd.getNumPartitions(), round(tt,3)))
    print("Reference data size: {} MBytes.\n".format(round(get_size(OutputFilename)/(1024.0*1024.0),3)))
    print("Done.")



def run_cmd(args_list):
    #print('Running system command: {0}'.format(' '.join(args_list)))
    proc = subprocess.Popen(args_list, stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
    proc.communicate()
    return proc.returncode
   

def check_file(hdfs_file_path):
    cmd = ['hdfs', 'dfs', '-test', '-e', hdfs_file_path]
    code = run_cmd(cmd)
    return code

  
def remove_file(hdfs_file_path):
    cmd = ['hdfs', 'dfs', '-rm', '-R', hdfs_file_path]
    code = run_cmd(cmd)
    return code


def get_size(start_path = '.'):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(start_path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            total_size += os.path.getsize(fp)
    return total_size


def main(sc, sqlContext, keySize=DKeySize, referenceFilename=DReferenceFilename, referenceName="Sequences", hashGroupsSize=DHashGroupsSize):

    print("main")
    CreateReference(sc, sqlContext, keySize, referenceFilename, referenceName, hashGroupsSize)


## Testing 

if (DDoTesting):
    keySize = DKeySize
    # Test 1
    #print("Testing: {}, {}, {}.".format(filename, keySize, referenceName))
    #TestCreateReferenceIndex(sc, keySize)

    # Test 2: Generate reference to file
    inputfile = DReferenceFilename
    outputfile = DOutputFilename
    #CreateReferenceIndexToFile(sc, sqlContext, keySize, inputfile, outputfile)

    
    # Test 3: Generate reference in Cassandra
    referenceFilename = '../Datasets/References/DummyReference.txt'
    referenceName = str.lower("Dummy_10000")
    Method = ECreate2LinesData 
    CreateWindowWithPartitions = False
    BlockSize = 100
    ContentBlockSize = 100
    main(sc, sqlContext, keySize, referenceFilename, referenceName)
    
    # Test 3: Generate reference in Cassandra
    referenceFilename = '../Datasets/References/DummyReference.txt'
    referenceName = str.lower("Dummy_10000")
    Method = ECreate2LinesData 
    CreateWindowWithPartitions = True
    BlockSize = 100
    ContentBlockSize = 100
    main(sc, sqlContext, keySize, referenceFilename, referenceName)
    
    
    # Test 4a: Generate reference in Cassandra
    referenceFilename = '../Datasets/References/GRCh38_latest_genomic_5P.fna'
    referenceName = str.lower("GRCh38_5P_3")
    #referenceFilename = '../Datasets/References/Example.txt'
    #referenceName = str.lower("Example")
    Method = ECreate2LinesData 
    #ContentBlockSize = 100
    Method = 1
    CreateWindowWithPartitions = False
    BlockSize = 128*1024
    ContentBlockSize = 100000
    #main(sc, sqlContext, keySize, referenceFilename, referenceName)
    
    # Test 4b: Generate reference in Cassandra
    referenceFilename = '../Datasets/References/GRCh38_latest_genomic_10K.fna'
    referenceName = str.lower("GRCh38_latest_genomic_10K")
    #referenceFilename = '../Datasets/References/Example.txt'
    #referenceName = str.lower("Example")
    Method = ECreateBlocksData 
    ContentBlockSize = 100
    #main(sc, sqlContext, keySize, referenceFilename, referenceName)
    
    # Test 4c: Generate reference in Cassandra
    referenceFilename = '../Datasets/References/GRCh38_latest_genomic_10K.fna'
    referenceName = str.lower("GRCh38_latest_genomic_10K")
    #referenceFilename = '../Datasets/References/Example.txt'
    #referenceName = str.lower("Example")
    Method = ECreateBlocksData 
    ContentBlockSize = 1000
    #main(sc, sqlContext, keySize, referenceFilename, referenceName)

    # Test 4c: Generate reference in Cassandra
    referenceFilename = '../Datasets/References/GRCh38_latest_genomic_10K.fna'
    referenceName = str.lower("GRCh38_latest_genomic_10K")
    #referenceFilename = '../Datasets/References/Example.txt'
    #referenceName = str.lower("Example")
    Method = ECreateBlocksData 
    ContentBlockSize = 10000
    #main(sc, sqlContext, keySize, referenceFilename, referenceName)
    
    error

## End Testing 

    

if __name__ == "__main__":

    ## Process parameters. (https://docs.python.org/2/library/argparse.html)
    ## SparkBlast_CreateReference <Reference_Files> [Key_size=11] [ReferenceName] [Method] 
    ## [PartitionSize] [ContentBlockSize]
    if (len(sys.argv)<2):
        print("Error parametes. Usage: SparkBlast_CreateReference <Reference_Files> [Key_size={}] [ReferenceName] [Method={}]  [PartitionSize={}] [ContentBlockSize={}] [HashGroupsSize={}].\n".format(DKeySize, DDefaultMethod, DPartitionBlockSize, DContentBlockSize, DHashGroupsSize))
        sys.exit(1)

    ReferenceFilename = sys.argv[1]
    KeySize = DKeySize
    Method = DDefaultMethod
    BlockSize = DPartitionBlockSize
    ContentBlockSize = DContentBlockSize
    HashGroupsSize = DHashGroupsSize
    if (len(sys.argv)>2):
        KeySize = int(sys.argv[2])
    if (len(sys.argv)>3):
        ReferenceName = (sys.argv[3]).lower()
    if (len(sys.argv)>4):
        Method = int(sys.argv[4])
    if (len(sys.argv)>5):
        CreateWindowWithPartitions = True
        BlockSize = int(sys.argv[5])
    if (len(sys.argv)>6):
        ContentBlockSize = int(sys.argv[6])
    if (len(sys.argv)>7):
        HashGroupsSize = int(sys.argv[7])   
    if (len(sys.argv)>8):
        StatisticsFileName = sys.argv[8]

    ## Configure Spark
    conf = SparkConf().setAppName(APP_NAME+ReferenceName)
    sc   = SparkContext(conf=conf)
    sqlContext = SQLContext(sc)
    random.seed()
    
    t0 = time()
    gt0_bc = sc.broadcast(t0)
        
    # Execute Main functionality
    print("{}({}, {}, {}, {}, {}, {}, {}).".format(sys.argv[0],ReferenceFilename, KeySize, ReferenceName, Method,  BlockSize, ContentBlockSize, HashGroupsSize))
    main(sc, sqlContext, KeySize, ReferenceFilename, ReferenceName, HashGroupsSize)



main
++++++++++++ INITIAL STATISTICS 08/28/2019 20:23:38 +++++++++++++
+ Reference: dummy_10000  	File: ../Datasets/References/DummyReference.txt.
+ Key Size: 11   	Method: 1.
+ Num Executors: None  	Executors/cores: None  	Executor Mem: None.
+ Hash Groups Size: 120000000  	Partition Size: 100  	Content Block Size: 100.
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+----+--------------------+------+
|size|               lines|offset|
+----+--------------------+------+
|  10|00000000410000000042|   400|
|  10|00000000500000000051|   490|
|  10|00000000150000000016|   140|
|  10|00000000720000000073|   710|
|  10|00000000460000000047|   450|
|  10|00000000420000000043|   410|
|  10|00000000930000000094|   920|
|  10|00000000540000000055|   530|
|  10|00000000790000000080|   780|
|  10|00000000890000000090|   880|
|  10|00000000580000000059|   570|
|  10|00000000990000000100|   980|
|  10|00000000800000000081|   790|
|  10|00000000360000000037|   35

########################### FINAL STATISTICS 08/28/2019 20:23:38 ###########################
# Reference: dummy_10000  	File: ../Datasets/References/DummyReference.txt.
# Key Size: 11            	Method: 1.
# Num Executors: None      	Executors/cores: None      	Executor Mem: None.
# Hash Groups Size: 120000000  	Partition Size: 100  	Content Block Size: 100.
# Total Time: 16.889         	Data Read Time: 4.132   	Data Frame Time: 0.687.
# Key Calc Time: 1.138   	Cassandra Write Time: 10.932.
############################################################################################
Done.
main
++++++++++++ INITIAL STATISTICS 08/28/2019 20:23:55 +++++++++++++
+ Reference: dummy_10000  	File: ../Datasets/References/DummyReference.txt.
+ Key Size: 11   	Method: 1.
+ Num Executors: None  	Executors/cores: None  	Executor Mem: None.
+ Hash Groups Size: 120000000  	Partition Size: 100  	Content Block Size: 100.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

########################### FINAL STATISTICS 08/28/2019 20:23:55 ###########################
# Reference: dummy_10000  	File: ../Datasets/References/DummyReference.txt.
# Key Size: 11            	Method: 1.
# Num Executors: None      	Executors/cores: None      	Executor Mem: None.
# Hash Groups Size: 120000000  	Partition Size: 100  	Content Block Size: 100.
# Total Time: 17.662         	Data Read Time: 5.365   	Data Frame Time: 0.633.
# Key Calc Time: 1.128   	Cassandra Write Time: 10.536.
############################################################################################
Done.


NameError: name 'error' is not defined

In [99]:
import random

def write_statistics(statisticsFileName, referenceFilename, referenceName, keySize, keysNumber, partitions, totalTime, readTime, dfTime, keyTime, casTime):

    DStatisticsFileHeader = "Procedure ; Reference File Name ; Reference Name ; Total Time (sec) ; Data Read Time (sec) ; DataFrame Time (sec) ; Key Calc Time (sec) ; Cassandra Write Time (sec) ; Key Size; Number of Keys ; Number of partitions ; Method ; Hash Group Size ; Hash Groups Size ; Block Size ; Content Block Size ; "
    DHdfsHomePath = "hdfs://babel.udl.cat/user/nando/"
    DHdfsResultsPath = DHdfsHomePath + "Results/"
    DHdfsTmpPath = DHdfsHomePath + "Tmp/"

    executors = sc._conf.get("spark.executor.instances")
    cores = sc.getConf().get("spark.executor.cores")
    memory = sc.getConf().get("spark.executor.memory")
        
    new_stats = "SparkBlast Create Hash ; %s ; %s ; %f ; %f ; %f ; %f ; %f ; %d ; %d ; %d ; %d ; %d ; %d ; %d ; " % (referenceFilename, referenceName, totalTime, readTime, dfTime, keyTime, casTime, keySize, keysNumber, partitions, Method, HashGroupsSize, BlockSize, ContentBlockSize)

        
    if (False):
        # Generate statistics in linux file.
        if os.path.isfile(statisticsFileName):
            statisticsFile = open(statisticsFileName,"a")
        else:
            statisticsFile = open(statisticsFileName,"w")
            statisticsFile.write(DStatisticsFileHeader)    

        #new_stats = "SparkBlast Create Hash ; " + referenceFilename + " ; " + referenceName + " ; " + str(totalTime) + " ; " + str(readTime) + " ; "+ str(dfTime) + " ; " + str(keyTime) + " ; " + str(casTime) + " ; " + str(keySize) + " ; " + str(keysNumber) + " ; " + str(partitions) + " ; " + str(Method) + " ; " + str(HashGroupsSize) + " ; " + str(BlockSize) + " ; " + str(ContentBlockSize) + " ;")
        statisticsFile.write(new_stats + "\n")
        statisticsFile.close()

        # Copy statistics file into hdfs
        #cmd = ['hdfs', 'dfs', '-put', statisticsFileName, DHdfsResultsPath]
        #run_cmd(cmd)

        print(new_stats)
        executors = sc._conf.get("spark.executor.instances")
        cores = sc.getConf().get("spark.executor.cores")
        memory = sc.getConf().get("spark.executor.memory")
        print("Print number of executors: {} \tExecutor cores: {} \tExecutors memory".format(executors, cores, memory))

    
    print("File {} exists? {}".format(DHdfsHomePath+statisticsFileName, check_file(DHdfsHomePath+statisticsFileName)))
 
    cmd = ['hdfs', 'dfs', '-cat', DHdfsHomePath+statisticsFileName]
    run_cmd(cmd)
    
    # Generate statistics in hdfs using rdd.
    # Read statstics file
    if (not check_file(DHdfsHomePath+statisticsFileName)):
        new_stats_rdd = sc.parallelize([new_stats],1)
    else:
        new_stats_rdd = sc.parallelize([DStatisticsFileHeader, new_stats],1)
        
    OutputFile = DHdfsTmpPath+"tmp"+str(random.randint(1, 100))
    remove_file(OutputFile)
    new_stats_rdd.saveAsTextFile(OutputFile)
  
    cmd = ['hdfs', 'dfs', '-getmerge',OutputFile+"/part-*", "./prueba"]
    if (run_cmd(cmd)):
        print("Error getmerge")
    cmd = ['hdfs', 'dfs', '-appendToFile',"./prueba", DHdfsHomePath+statisticsFileName]
    if (run_cmd(cmd)):
        print("Error appendToFile")
    
    remove_file(OutputFile)
    

DHdfsHomePath = "/user/nando/"
DHdfsResultsPath = DHdfsHomePath + "Results/"
StatisticsFileName = "Results/CreateHash.res"

write_statistics(StatisticsFileName, "prueba1", "1", 11,  1000, 2, 2.0, 11, 12, 13, 14)

error
    
DStatisticsFileHeader = [("Procedure" , "Reference File Name" , "Reference Name" , "Total Time")]

new_stats = ["preueradf", "dos", str(2), str(344.2)]

stats_rdd = sc.textFile("hdfs://babel.udl.cat/"+DHdfsHomePath+StatisticsFileName)
stat = stats_rdd.collect()
print(stats_rdd.collect())

stats_rdd2 = sc.parallelize(stat+new_stats,1)
print(stats_rdd2.collect())

stats_rdd2.saveAsTextFile("hdfs://babel.udl.cat/"+DHdfsHomePath+StatisticsFileName+"3")

error

stats_rdd = sc.parallelize(DStatisticsFileHeader)
print(stats_rdd.collect())
new_stats_rdd = sc.parallelize(new_stats)
print(new_stats_rdd.collect())

stats_rdd.union(new_stats)
stats_rdd.collect()

File hdfs://babel.udl.cat/user/nando/Results/CreateHash.res exists? 0


NameError: name 'error' is not defined

In [None]:
# Test 1: Generate reference in Cassandra -> 2LinesData & 1 Partition
referenceFilename = DReferenceFilename
referenceName = DReferenceName
CreateWindowWithPartitions = False
main(sc, sqlContext, keySize, referenceFilename, referenceName)

In [None]:
# Test 2: Generate reference in Cassandra -> 1LineDataWithoutDependencies & 1 Partition
referenceFilename = DReferenceFilename
referenceName = DReferenceName
Method = ECreate1LineDataWithoutDependencies
CreateWindowWithPartitions = False
main(sc, sqlContext, keySize, referenceFilename, referenceName)

In [None]:
# Test 3: Generate reference in Cassandra -> 2LinesData & with Partitions
referenceFilename = DReferenceFilename
referenceName = DReferenceName
Method = ECreate2LinesData
CreateWindowWithPartitions = True
main(sc, sqlContext, keySize, referenceFilename, referenceName)

In [None]:
# Test 4: Generate reference in Cassandra -> 1LineDataWithoutDependencies & with Partitions
referenceFilename = DReferenceFilename
referenceName = DReferenceName
Method = ECreate1LineDataWithoutDependencies
CreateWindowWithPartitions = True
main(sc, sqlContext, keySize, referenceFilename, referenceName)