## Flatten a dataframe into a single row

https://stackoverflow.com/questions/50556537/flatten-dataframe-into-a-single-row - actually, no - this is the Pandas solution.

Try this instead -

https://sparkbyexamples.com/spark/how-to-pivot-table-and-unpivot-a-spark-dataframe/

## Setting up the Spark Context

We reset the spark context to test and optimise the parameters we will need to include when batch submitting in cluster mode.

                           ('spark.maxRemoteBlockSizeFetchToMem', 2048), \


#### Optimising the partitions

https://luminousmen.com/post/spark-tips-partition-tuning



In [23]:
numBins=30 ## code in here to optimise and calculate the number of bins we want!


conf = SparkConf().setAll([('spark.executor.memory', '10g'),\
                           ('spark.driver.memory', '10g'),\
                           ('spark.shuffle.service.enabled', True), \
                           ('spark.sql.shuffle.partitions', 768), \
                           ('spark.maxRemoteBlockSizeFetchToMem', '2g'), \
                           ('spark.default.parallelism', 768),\
                           ('spark.dynamicAllocation.enabled', True), \
                           ('spark.dynamicAllocation.executorIdleTimeout', 600), \
                           ('spark.network.timeout', 1200), \
                           ('spark.executor.cores', 4),\
                           #('spark.executor.instances', numBins), \
                           ('spark.sql.codegen.wholeStage', False ), \
                           # to avoid GeneratedIteratorForCOdegenStage grows beyond 64 KB errors
                           # overridden('spark.local.dir','/mnt/FITs/Spark/tmp'),\
                           ('spark.jars.packages', 'com.github.astrolabsoftware:spark-fits_2.11:0.9.0'),\
                           ('spark.executor.memoryOverhead', 4096),\
                           # deprecated ('spark.yarn.executor.memoryOverhead', 4096), \
                           ('spark.driver.memoryOverhead', '600m'),\
                           ('spark.driver.maxResultSize', '4g'),\
                           ('spark.rpc.message.maxSize', '512'),\
                           ('spark.scheduler.mode', 'FAIR'),\
                           ('spark.kryoserializer.buffer.max', '1g'),\
                           ('spark.driver.allowMultipleContexts', True), \
                           ('spark.hadoop.hive.exec.dynamic.partition', True), \
                           ('spark.hadoop.hive.exec.dynamic.partition.mode','nonstrict'), \
                           ('spark.hadoop.hive.exec.max.dynamic.partitions', 100000), \
                           ('spark.hadoop.hive.exec.max.dynamic.partitions.pernode', 100000), \
                           ('spark.app.name','Distributed SoFiA Test - dataframe tuning')])


In [24]:
if True:

    sc.stop()
    spark.stop()
    SparkSession._instantiatedContext = None

    sc=SparkContext(conf=conf)
    spark=SparkSession(sc)
    sqlContext = SQLContext(sc)

In [25]:
sc

In [26]:
sqlContext

<pyspark.sql.context.SQLContext at 0x7fa674ae6208>

In [27]:
import numpy as np # linear algebra
import os
import math
import copy
import random
import sys

import time

import py4j.protocol  
from py4j.protocol import Py4JJavaError  
from py4j.java_gateway import JavaObject  
from py4j.java_collections import JavaArray, JavaList

from pyspark import RDD, SparkContext  
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.ml.feature import StringIndexer, StandardScaler,VectorAssembler
from pyspark.ml import Pipeline

from pyspark.sql.window import Window

from pyspark.sql import functions as F
import pyspark.sql.functions as f

from pyspark.sql.functions import udf, col, rand
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector

from pyspark.sql.functions import rand
from pyspark.sql.types import ArrayType, StringType, FloatType,IntegerType, DataType, DoubleType, MapType, Row
from pyspark import SparkContext
from pyspark.mllib.evaluation import MulticlassMetrics

from myLib import GetDetailArrays, GetSubCube2, GetDataframeSize, CreateFITSSubCubeUDF, writeResults 

from astropy.io import fits
from astropy.table import Table
from astropy.visualization import astropy_mpl_style
from astropy.wcs import WCS
from astropy.coordinates import SkyCoord

In [28]:
iType=IntegerType()
dType=DoubleType()
fType=FloatType()

aType=ArrayType(fType)

In [29]:
from time import time, clock
from datetime import datetime
class Timer:
    """
    a simple class for printing time (s) since last call
    """
    def __init__(self):
        self.t0=time()
        self.t1=clock()
        self.elapsed = 1
        self.elapsedCPU = 1
    
    def start(self):
        self.t0=time()
        self.t1=clock()
        
    def stop(self):
        t1=time()
        t2=clock()
        print("Elapsed {:2.1f}s, CPU {:2.1f}s".format(t1-self.t0, t2-self.t1))
        self.elapsed = t1-self.t0
        self.elapsedCPU = t2-self.t1

timer=Timer()

class WriteDataframeError(Exception): 
  
    # Constructor or Initializer 
    def __init__(self, value): 
        self.value = value 
  
    # __str__ is to print() the value 
    def __str__(self): 
        return(repr(self.value)) 


In [30]:
[{
    "name": s.name(),     
    "memSize_MB": float(s.memSize())/ 2**20 , 
    "memSize_GB": float(s.memSize())/ 2**30, 
    "diskSize_MB": float(s.diskSize())/ 2**20, 
    "diskSize_GB": float(s.diskSize())/ 2**30, 
    "numPartitions": s.numPartitions(), 
    "numCachedPartitions": s.numCachedPartitions(),
    "callSite": s.callSite(),
    "externalBlockStoreSize": s.externalBlockStoreSize(),
    "id": s.id(),
    "isCached": s.isCached(),
    "parentIds": s.parentIds(),
    "scope": s.scope(),
    "storageLevel": s.storageLevel(),
    "toString": s.toString()
} for s in sc._jsc.sc().getRDDStorageInfo()]


[]

In [31]:
for s in sc._jsc.sc().getRDDStorageInfo():
    print(s.name())

### Debug testing 

Compares the process times when we cache the temp pables against when we don't

In [32]:
cacheTempTables=False

In [33]:
start_elapsed=time()
start_cpu=clock()

In [34]:
sqlContext.sql('use fits_investigation')

DataFrame[]

In [35]:
sqlContext.sql('desc sparkfits_images').show()

+--------------------+------------+-------+
|            col_name|   data_type|comment|
+--------------------+------------+-------+
|           spi_index|         int|   null|
|           spi_image|array<float>|   null|
|        spi_filename|      string|   null|
|            spi_band|         int|   null|
|# Partition Infor...|            |       |
|          # col_name|   data_type|comment|
|        spi_filename|      string|   null|
|            spi_band|         int|   null|
+--------------------+------------+-------+



In [36]:
for tName in ("collatedImages", "compress_one", "compress_two", "compress_three"):
    try:
        sqlContext.sql("drop table {}".format(tName))
        pass
    except Exception as e:
        msg = " ERROR! "
        if hasattr(e, 'message'):
            msg += str(e.message)
        else:

            msg += str(e)
            pass
        print(msg)

 ERROR! 'Table or view not found: collatedImages;'
 ERROR! 'Table or view not found: compress_one;'
 ERROR! 'Table or view not found: compress_two;'
 ERROR! 'Table or view not found: compress_three;'


In [37]:
#fitsFilename='residual.i.SB8170.cube.fits.NaNs.Removed'
fitsFilename='image.restored.i.SB2338.V2.cube.fits'
raType='RA---SIN'
decType='DEC--SIN'
spectraType='Hz'



In [38]:
raArray, decArray, spectraArray = GetDetailArrays(sqlContext, fitsFilename, raType, decType, spectraType)

In [39]:
print("Range of Ra {} to {}".format( max(raArray), min(raArray)))
print("Range of Declination {} to {}".format( max(decArray), min(decArray)))
print("Frequency ranges in {} - {} {}".format(spectraType, min(spectraArray), max(spectraArray))) 

Range of Ra 337.9596862792969 to 328.57586669921875
Range of Declination -42.05752944946289 to -48.32979202270508
Frequency ranges in Hz - 1376499968.0 1424481536.0


In [40]:
from threading import Thread
from multiprocessing.pool import ThreadPool
#import Queue

In [41]:
def ProcessThread(inputArray):
    # Random sleep to stagger the processes
    if random.randint(0,1):
        sleep(randint(100,400)/1000)
        pass
    #count=sqlContext.sql("select count(*)as count from sparkfits_detail_arrays").select("count")
    #print("Hi there. loRa {} - HiRA {}".format(str(funkyArray[0]), str(funkyArray[1]) ))
    #print("Hi there. loDec {} - HiDec {}".format(str(funkyArray[2]), str(funkyArray[3]) ))
    #print("Hi there. loFreq {} - HiFreq {}".format(str(funkyArray[4]), str(funkyArray[5]) ))
    
    
    hiDec=inputArray[3]
    loDec=inputArray[2]
    hiRa=inputArray[1]
    loRa=inputArray[0]
    loFreq=inputArray[4]
    hiFreq=inputArray[5]
    
    print("hiDec {} loDec {} hiRa {} loRa {} loFreq {} hiFreq {}".format( str(hiDec), str(loDec),str(hiRa),str(loRa),str(loFreq),str(hiFreq) ))

    depthOfCubes=5

    CubeSize=[hiRa,loRa,hiDec,loDec,loFreq,hiFreq]
    print("Cube size is ",CubeSize)
    
    raPix=len( np.array(np.where(np.logical_and(raArray >= loRa, raArray <= hiRa )))[0] )
    decPix=len( np.array(np.where(np.logical_and(decArray >= loDec, decArray <= hiDec )))[0] )
    frqPix=len( np.array(np.where(np.logical_and(spectraArray >= loFreq, spectraArray <= hiFreq )))[0] )
    print(raPix, decPix, frqPix)

    size=raPix*decPix*frqPix*32/8e9 # 8e6 = MB, 8e9= GB
    print(size)

    ntileCount=frqPix/depthOfCubes
    
    subCubeDF,raHeaderIndex,decHeaderIndex,freqHeaderIndex,naxis1, naxis2, naxis4 \
    =GetSubCube2(sqlContext, fitsFilename, decType,spectraType, raArray, raType, \
                 decArray, spectraArray, CubeSize, round(ntileCount) )
        
    running=True
    i=0
    runTries=10
    while running and i <= runTries:
        try:
            if i !=0:
                # second try, random sleep
                sleep(randint(100,400)/1000)
                
            print("GetSubCube2 created - creating temp table")
            subCubeDF.createOrReplaceTempView("collatedImages")
            
            compress1=sqlContext.sql("""
                with rawData as
                (
                    select bins, sda_Frequency_hz, spi_index, ra as rightAscension,
                    map(

                        'dec', sda_declination
                    ) as kva,
                    map(
                        'pixels', raSelectRange
                    ) as kvi
                    from collatedImages
                    distribute by sda_Frequency_hz
                    sort by sda_Frequency_hz, spi_index
                )
                select 
                    sda_Frequency_hz as frequency,rightAscension,
                    bins,
                    collect_list(float(a.kva['dec']))as declination
                    ,collect_list(array(a.kvi['pixels']))as pixs
                from rawData a
                group by frequency, rightAscension, bins            
            """)
            compress1.createOrReplaceTempView("compress_one")
            
            compress2=sqlContext.sql("""
                select
                    bins,
                    rightAscension, declination,
                    collect_list(array(float(a.kvi['frequencies']))) as frequencies,
                    collect_list(array(a.kva['pixs'])) as pixels
                from (
                    select
                        bins,
                        rightAscension, declination,
                        map('frequencies', frequency) as kvi,
                        map('pixs', pixs) as kva
                    from compress_one
                    distribute by rightAscension
                    sort by frequency
                ) a
                group by bins,
                rightAscension, declination            
            """)
            
            originalHeader=sqlContext.sql("""
                with rawData as
                (
                    select grp, sfh_index, 
                    map(

                        'key', sfh_key
                    ) as kva,
                    map(
                        'value', sfh_value
                    ) as kvi
                    from (
                    --headers
                        select 1 as grp, sfh_index, sfh_key, sfh_value 
                        from sparkfits_fits_headers
                        where sfh_fits_file='{}' 
                        order by sfh_index
                    ) a
                    distribute by grp
                    sort by grp, sfh_index
                )
                select 
                    grp,
                    collect_list(string(a.kva['key']))as keys
                    ,collect_list(string(a.kvi['value']))as values
                from rawData a
                group by grp            
            """.format(fitsFilename)).select("keys","values")
            
            compress3 = compress2.crossJoin(originalHeader)
            test1 = udf(CreateFITSSubCubeUDF, StringType() )
            
            
            compress3=compress3\
            .withColumn('process_msg', test1(compress3.rightAscension, \
                                           compress3.declination, \
                                           compress3.frequencies, \
                                           compress3.pixels, \
                                           compress3.keys, \
                                           compress3.values, \
                                           f.lit(fitsFilename)))
            print("compress3 created")
            dfout=compress3.select('bins','process_msg')
            n=dfout.count()
            
            
            print("dfout created - {} rows".format(str( n )))
            from datetime import datetime
            now = datetime.now()
            date_time = now.strftime("%d/%m/%Y, %H:%M:%S")
            print(date_time)
            
            j=0
            writeTries=10
            while running and j <= writeTries:
                try:
                    dfout.select(F.lit(date_time).alias("runDate"), "bins", "process_msg" )\
                    .write.mode('append').format('parquet').saveAsTable('df_result')
                    print("Results written")
                    running=False
                except Exception as e:
                    if j == writeTries:
                        errMsg=("Writing results to df_result has failed - {}".format(str(e)))
                        raise(WriteDataframeError(errMsg))
                        pass
                    print("Writing results to df_result has failed - retrying...")
                j+=1

                
        except WriteDataframeError as e:
            print(str(e))
            print("Retrying ProcessThread...")
        except Exception as e:
            ## caused by multi[ple inserts as singlerows
            print("WARNING ProcessThread FAILURE...retrying...")
            
            print("Cleaning out the cached dataframes")
            for (id, rdd) in spark.sparkContext._jsc.getPersistentRDDs().items():
                rdd.unpersist()
                pass
            spark.sparkContext._jsc.getPersistentRDDs().items()
            pass
        finally:
            if i == runTries:
                print("ERROR - ProcessThread FAILURE...{}".format(str(e)))
                running=False        
        i+=1
        pass
    
    for (id, rdd) in spark.sparkContext._jsc.getPersistentRDDs().items():
        rdd.unpersist()

    #writeResults(subCubeDF, 'append', 'parquet', 'collatedImages')
    #CubeSize=[hiRa,loRa,hiDec,loDec,loFreq,hiFreq]
    print("done! with ProcessThread")
    print("")
    print("================================================================================")


In [42]:
raBucketSize=15
decBucketSize=15
freqBucketSize=15

raHist=np.histogram(raArray, raBucketSize)
decHist=np.histogram(np.flip(decArray), decBucketSize)
freqHist=np.histogram(spectraArray, freqBucketSize)
arr = np.empty((0,6), float)

for i in np.arange(raBucketSize):
    for j in  np.arange(decBucketSize):
        for k in np.arange(freqBucketSize):
            
            x=np.array([[raHist[1][i], raHist[1][i+1], decHist[1][j], decHist[1][j+1], freqHist[1][k], freqHist[1][k+1] ]])
            arr=np.append(arr, x, axis=0)


In [43]:
arr[0:10,:]

array([[ 3.28575867e+02,  3.29201455e+02, -4.83297920e+01,
        -4.79116412e+01,  1.37649997e+09,  1.37969874e+09],
       [ 3.28575867e+02,  3.29201455e+02, -4.83297920e+01,
        -4.79116412e+01,  1.37969874e+09,  1.38289751e+09],
       [ 3.28575867e+02,  3.29201455e+02, -4.83297920e+01,
        -4.79116412e+01,  1.38289751e+09,  1.38609628e+09],
       [ 3.28575867e+02,  3.29201455e+02, -4.83297920e+01,
        -4.79116412e+01,  1.38609628e+09,  1.38929505e+09],
       [ 3.28575867e+02,  3.29201455e+02, -4.83297920e+01,
        -4.79116412e+01,  1.38929505e+09,  1.39249382e+09],
       [ 3.28575867e+02,  3.29201455e+02, -4.83297920e+01,
        -4.79116412e+01,  1.39249382e+09,  1.39569260e+09],
       [ 3.28575867e+02,  3.29201455e+02, -4.83297920e+01,
        -4.79116412e+01,  1.39569260e+09,  1.39889137e+09],
       [ 3.28575867e+02,  3.29201455e+02, -4.83297920e+01,
        -4.79116412e+01,  1.39889137e+09,  1.40209014e+09],
       [ 3.28575867e+02,  3.29201455e+02, -4.832

In [44]:
from random import randint
from time import sleep
gNumProcs=5
#q=Queue.Queue()
tpool=ThreadPool(processes=gNumProcs)

data=sc.parallelize(arr[0:10,:])

masterLoop=data.map(lambda row: [float(row[0]),float(row[1]),float(row[2]),float(row[3]), \
float(row[4]),float(row[5])] )

timer.start()
print("Loading pools")
billy=tpool.map(ProcessThread, [x for x in masterLoop.toLocalIterator()])
timer.stop()

print("All Done")

#def runQuery(sc,<POOL_ID>,query):
#    sc.setLocalProperty("spark.scheduler.pool", pool_id)
#    .....<your code>
#    return df

#t1 = threading.thread(target=runQuery,args=(sc,"1",<query1>)
#t2 = threading.thread(target=runQuery,args=(sc,"2",<query2>)

# start the threads...
#t1.start()
#t2.sart()

# wait for the threads to complete and get the returned data frames...
#df1 = t1.join()
#df2 = t2.join()


Loading pools
hiDec -47.911641184488936 loDec -48.32979202270508 hiRa 329.201454671224 loRa 328.57586669921875 loFreq 1379698739.2 hiFreq 1382897510.4hiDec -47.911641184488936 loDec -48.32979202270508 hiRa 329.201454671224 loRa 328.57586669921875 loFreq 1382897510.4 hiFreq 1386096281.6
Cube size is  [329.201454671224, 328.57586669921875, -47.911641184488936, -48.32979202270508, 1382897510.4, 1386096281.6]
hiDec -47.911641184488936 loDec -48.32979202270508 hiRa 329.201454671224 loRa 328.57586669921875 loFreq 1389295052.8 hiFreq 1392493824.0
Cube size is  [329.201454671224, 328.57586669921875, -47.911641184488936, -48.32979202270508, 1389295052.8, 1392493824.0]
374 377 173
0.097570616
Caching intermediate dataframes...

Cube size is  [329.201454671224, 328.57586669921875, -47.911641184488936, -48.32979202270508, 1379698739.2, 1382897510.4]
374Parameters extracted 377 173
0.097570616
Caching intermediate dataframes...
Parameters extracted
Ra select range determined

Ra select range determ

compress3 created
dfout created - 173 rows
10/01/2021, 00:01:00
dfout created - 173 rows
10/01/2021, 00:01:03
dfout created - 173 rows
10/01/2021, 00:01:15
dfout created - 0 rows
10/01/2021, 00:01:22
Final base image dataframe created...removing intermediate dataframes from cache...
Elapsed 113.1s, CPU 0.9s
GetSubCube2 created - creating temp table
compress3 created
dfout created - 173 rows
10/01/2021, 00:01:38
Results written
done! with ProcessThread

Results written
done! with ProcessThread

Results written
done! with ProcessThread

Results written
done! with ProcessThread

Results written
done! with ProcessThread

Elapsed 429.3s, CPU 3.1s
All Done


In [67]:
hiDec=np.flip(decHist[1])[0]
loDec=np.flip(decHist[1])[1]
hiRa=raHist[1][1]
loRa=raHist[1][0]
loFreq=freqHist[1][0] #spectraArray[0]
hiFreq=freqHist[1][1] #spectraArray[299]

depthOfCubes=5

CubeSize=[hiRa,loRa,hiDec,loDec,loFreq,hiFreq]

In [68]:
raPix=len( np.array(np.where(np.logical_and(raArray >= loRa, raArray <= hiRa )))[0] )
decPix=len( np.array(np.where(np.logical_and(decArray >= loDec, decArray <= hiDec )))[0] )
frqPix=len( np.array(np.where(np.logical_and(spectraArray >= loFreq, spectraArray <= hiFreq )))[0] )
print(raPix, decPix, frqPix)

size=raPix*decPix*frqPix*32/8e9 # 8e6 = MB, 8e9= GB
print(size)

561 565 260
0.3296436


## Calculate the ntile size

The number of frequencies in each row of the final dataframe is determined by the variable depthOfCubes. We divide the total number of frequencies we are extracting by depthOfCubes to get the number of bins we want.

Generally speaking, this should mean that depthOfCubes divides exactly into the number of frequencies, frqPix.

In [69]:
ntileCount=frqPix/depthOfCubes
round(ntileCount,0)

52.0

In [70]:
subCubeDF,raHeaderIndex,decHeaderIndex,freqHeaderIndex,naxis1, naxis2, naxis4 \
=GetSubCube2(sqlContext, fitsFilename, decType,spectraType, raArray, raType, decArray, spectraArray, CubeSize, round(ntileCount) )
#GetSubCube2(sqlContext, fitsFilename, decType,spectraType, raArray, decArray, spectraArray, CubeSize, cacheTempTables)

Parameters extracted
Ra select range determined
Raw RA data extracted
Filtered RA data extracted
Raw Declination data extracted
Raw Frequency data extracted
Filtered Dec and Frequency data extracted
Raw Images data extracted


In [71]:
writeResults(subCubeDF, 'overwrite', 'parquet', 'collatedImages')
# sqlContext.sql("""select * from collatedImages""").show()

### NOTE!

In order to maintain order when we construct the collect_lists, we need to use DISTRIBUTE BY to guarantee that all rowss for a particular frequency are routed to the same reducer; otherwise we can't guarantee that the order will be maintained.

See https://stackoverflow.com/questions/45092576/how-to-use-order-by-with-collect-set-operation-in-hive

#### REMEMBER!

It's not a relational database where the ordering in usually implicit!

### NOTE!

Drop the temporary tables when you've finished with them to free up resources. 

For temporary views spark.catalog.dropTempView("df") - these are the session scoped tables, ie. the ones we use in this notebook.

For global views spark.catalog.dropGlobalTempView("df")

We can also free up resources for dataframes we have finished using, i.e. df.unpersist()

In [72]:
compress1=sqlContext.sql("""
with rawData as
(
    select bins, sda_Frequency_hz, spi_index, ra as rightAscension,
    map(
        
        'dec', sda_declination
    ) as kva,
    map(
        'pixels', raSelectRange
    ) as kvi
    from collatedImages
    distribute by sda_Frequency_hz
    sort by sda_Frequency_hz, spi_index
)
select 
    sda_Frequency_hz as frequency,rightAscension,
    bins,
    collect_list(float(a.kva['dec']))as declination
    ,collect_list(array(a.kvi['pixels']))as pixs
from rawData a
group by frequency, rightAscension, bins
""")

if False:
    compress1.show()

writeResults(compress1, 'overwrite', 'parquet', 'compress_one')
sqlContext.sql("""drop table collatedImages""")
#sqlContext.sql("""select * from compress_one""").show()
#compress1.createOrReplaceTempView("compress_one")
if cacheTempTables:
    print("caching...")
    compress1.cache().count()
    spark.catalog.cacheTable("compress_one") 
    
    # No longer need collated images, so unpersist

    subCubeDF.unpersist()
    spark.catalog.uncacheTable("collatedImages")

### Now, let's take it that one step further and compress those three frequencies into one row

In [73]:
compress2=sqlContext.sql("""
select
    bins,
    rightAscension, declination,
    collect_list(array(float(a.kvi['frequencies']))) as frequencies,
    collect_list(array(a.kva['pixs'])) as pixels
from (
    select
        bins,
        rightAscension, declination,
        map('frequencies', frequency) as kvi,
        map('pixs', pixs) as kva
    from compress_one
    distribute by rightAscension
    sort by frequency
) a
group by bins,
rightAscension, declination
""")

writeResults(compress2, 'overwrite', 'parquet', 'compress_two')
sqlContext.sql("""drop table compress_one""")

if cacheTempTables:
    print("caching...")
    compress2.cache().count()

    # finished with compress1, unpersist
    compress1.unpersist()
    spark.catalog.uncacheTable("compress_one")

## Function to create the new header as a dictionary object

So we can pass this in to the UDF

In [74]:
originalHeader=sqlContext.sql("""
with rawData as
(
    select grp, sfh_index, 
    map(
        
        'key', sfh_key
    ) as kva,
    map(
        'value', sfh_value
    ) as kvi
    from (
    --headers
        select 1 as grp, sfh_index, sfh_key, sfh_value 
        from sparkfits_fits_headers
        where sfh_fits_file='{}' 
        order by sfh_index
    ) a
    distribute by grp
    sort by grp, sfh_index
)
select 
    grp,
    collect_list(string(a.kva['key']))as keys
    ,collect_list(string(a.kvi['value']))as values
from rawData a
group by grp
""".format(fitsFilename)).select("keys","values")

# originalHeader.show()

In [75]:
c2=sqlContext.sql("select * from compress_two")
compress3 = c2.crossJoin(originalHeader)
writeResults(compress3, 'overwrite', 'parquet', 'compress_three')
#sqlContext.sql("""select * from compress_three""").show()

In [77]:
for tName in ("collatedImages", "compress_one", "compress_two"):
    try:
        sqlContext.sql("drop table {}".format(tName))
        pass
    except Exception as e:
        msg = " ERROR! "
        if hasattr(e, 'message'):
            msg += str(e.message)
        else:

            msg += str(e)
            pass
        print(msg)

 ERROR! 'Table or view not found: collatedImages;'
 ERROR! 'Table or view not found: compress_one;'


In [29]:
if cacheTempTables:
    compress3.cache().count()
    compress2.unpersist()

In [78]:
[{
    "name": s.name(),     
    "memSize_MB": float(s.memSize())/ 2**20 , 
    "memSize_GB": float(s.memSize())/ 2**30, 
    "diskSize_MB": float(s.diskSize())/ 2**20, 
    "diskSize_GB": float(s.diskSize())/ 2**30, 
    "numPartitions": s.numPartitions(), 
    "numCachedPartitions": s.numCachedPartitions(),
    "callSite": s.callSite(),
    "externalBlockStoreSize": s.externalBlockStoreSize(),
    "id": s.id(),
    "isCached": s.isCached(),
    "parentIds": s.parentIds(),
    "scope": s.scope(),
    "storageLevel": s.storageLevel(),
    "toString": s.toString()
} for s in sc._jsc.sc().getRDDStorageInfo()]


[{'name': 'CartesianProduct\n:- InMemoryTableScan [sda_index#1097, sda_declination#1098]\n:     +- InMemoryRelation [sda_index#1097, sda_declination#1098], true, 10000, StorageLevel(disk, memory, 1 replicas)\n:           +- Filter ((isnotnull(sda_declination#1098) && (cast(sda_declination#1098 as double) <= -42.05752944946289)) && (cast(sda_declination#1098 as double) >= -43.311981964111325))\n:              +- InMemoryTableScan [sda_index#1097, sda_declination#1098], [isnotnull(sda_declination#1098), (cast(sda_declination#1098 as double) <= -42.05752944946289), (cast(sda_declination#1098 as double) >= -43.311981964111325)]\n:                    +- InMemoryRelation [sda_index#1097, sda_declination#1098], true, 10000, StorageLevel(disk, memory, 1 replicas)\n:                          +- Project [pos#125 AS sda_index#120, col#126 AS sda_declination#121]\n:                             +- Generate posexplode(sda_detail_array#122), false, [pos#125, col#126]\n:                               

In [79]:
print("Cleaning out the cached dataframes")
for (id, rdd) in spark.sparkContext._jsc.getPersistentRDDs().items():
    rdd.unpersist()
    pass
spark.sparkContext._jsc.getPersistentRDDs().items()

Cleaning out the cached dataframes


ItemsView({})

In [80]:
compress3.printSchema()

root
 |-- bins: integer (nullable = true)
 |-- rightAscension: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- declination: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- frequencies: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: float (containsNull = true)
 |-- pixels: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |-- element: float (containsNull = true)
 |-- keys: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- values: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [81]:
from pyspark.sql.types import MapType, StructType,StructField

schema = ArrayType(StructType([
    StructField("msgNum", IntegerType(), False),
    StructField("message", StringType(), False)
]))

schema1 = StructType([
    
    StructField("msgNum", IntegerType(), False),
    StructField("message", StringType(), False),
])

test = udf(CreateFITSSubCubeUDF, ArrayType(IntegerType() ) )
test1 = udf(CreateFITSSubCubeUDF, StringType() ) 
test2 = udf(CreateFITSSubCubeUDF, schema ) 

In [82]:
dfout=sqlContext.sql("select * from compress_three ")

In [83]:
dfout=dfout\
.withColumn('process_msg', test1(dfout.rightAscension, \
                                           dfout.declination, \
                                           dfout.frequencies, \
                                           dfout.pixels, \
                                           dfout.keys, \
                                           dfout.values, \
                                           f.lit(fitsFilename))).select('bins','process_msg')

In [84]:
dfout.printSchema()
dfSize=GetDataframeSize(sc,dfout)

print(dfSize)

root
 |-- bins: integer (nullable = true)
 |-- process_msg: string (nullable = true)

1943.73456


In [85]:
from datetime import datetime
now = datetime.now() 
date_time = now.strftime("%d/%m/%Y, %H:%M:%S")
date_time

'28/11/2020, 22:41:00'

In [86]:
timer.start()
try:    
    #resBins=dfout.cache().count()
    #print("dfout has calculated, computing {} bins. Writing results...".format(resBins))
    dfout.select(F.lit(date_time).alias("runDate"), "bins", "process_msg" )\
    .write.insertInto("fits_investigation.df_result", False)
    
    print("Results written")
    
except Exception as e:
    msg = " ERROR! "
    if hasattr(e, 'message'):
        msg += str(e.message)
    else:
        
        msg += str(e)
        pass
    print(msg)

finally:
    print("Cleaning out the cached dataframes")
    for (id, rdd) in spark.sparkContext._jsc.getPersistentRDDs().items():
        rdd.unpersist()
        pass
    spark.sparkContext._jsc.getPersistentRDDs().items()

timer.stop()

Results written
Cleaning out the cached dataframes
Elapsed 24.3s, CPU 0.0s


In [87]:
end_elapsed=time()
end_cpu=clock()

elapsed=end_elapsed - start_elapsed
print(elapsed)

1232.5309450626373


In [39]:
LogResults=True

if cacheTempTables:
    cached = "Cached"
    pass
else:
    cached = "Not cached"
    
cube_dimensions=[raPix,decPix,frqPix]

if LogResults:
    print("Logging results...")
    Results = Row(
        "runDate",
        "Fits_filename",      
        "cube_Dimensions",
        "Cached",
        "SizeEstimate_NumPix",  
        "SizeEstimate_JavaObj",        
        "total_elapsed")
    result=Results(
        date_time,
        fitsFilename,
        cube_dimensions,
        cached,
        size,
        dfSize,
        elapsed)



    resultsDF=spark.createDataFrame([result])
    MODE='append'
    FORMAT='parquet'
    TABLE='SoFiADataframeTestJupyter'
    
    writeResults(resultsDF, MODE, FORMAT, TABLE)
    print("results logged")
    


Logging results...
results logged


In [40]:
sqlContext.sql("select * from {} ".format('SoFiADataframeTestJupyter')).show(60)

+--------------------+--------------------+----------------+----------+-------------------+--------------------+------------------+
|             runDate|       Fits_filename| cube_Dimensions|    Cached|SizeEstimate_NumPix|SizeEstimate_JavaObj|     total_elapsed|
+--------------------+--------------------+----------------+----------+-------------------+--------------------+------------------+
|26/11/2020, 22:16:29|image.restored.i....|[597, 677, 1000]|Not cached|           1.616676|         1281.695904|230.01169610023499|
|26/11/2020, 21:12:40|image.restored.i....|[597, 677, 1000]|Not cached|           1.616676|         1284.755904| 573.3540256023407|
|27/11/2020, 01:31:40|image.restored.i....|[597, 677, 1000]|Not cached|           1.616676|          1136.27072|  725.933874130249|
|27/11/2020, 04:26:44|image.restored.i....| [561, 565, 260]|Not cached|          0.3296436|         1110.913216|164.38950848579407|
|27/11/2020, 01:25:14|image.restored.i....|[597, 677, 1000]|Not cached|     

In [41]:
sqlContext.sql("REFRESH TABLE {} ".format('df_result'))

DataFrame[]

In [42]:
sqlContext.sql("select * from {} ".format('df_result')).show(180)

+--------------------+----+--------------------+
|             runDate|bins|         process_msg|
+--------------------+----+--------------------+
|27/11/2020, 02:00:16|   1|All complete, no ...|
|27/11/2020, 02:00:16|   2|All complete, no ...|
|27/11/2020, 02:00:16|   3|All complete, no ...|
|27/11/2020, 02:00:16|   4|All complete, no ...|
|27/11/2020, 02:00:16|   5|All complete, no ...|
|27/11/2020, 02:00:16|   6|All complete, no ...|
|27/11/2020, 02:00:16|   7|All complete, no ...|
|27/11/2020, 02:00:16|   8|All complete, no ...|
|27/11/2020, 02:00:16|   9|All complete, no ...|
|27/11/2020, 02:00:16|  10|All complete, no ...|
|27/11/2020, 02:00:16|  11|All complete, no ...|
|27/11/2020, 02:00:16|  12|All complete, no ...|
|27/11/2020, 02:00:16|  13|All complete, no ...|
|27/11/2020, 02:00:16|  14|All complete, no ...|
|27/11/2020, 02:00:16|  15|All complete, no ...|
|27/11/2020, 02:00:16|  16|All complete, no ...|
|27/11/2020, 02:00:16|  17|All complete, no ...|
|27/11/2020, 02:00:1

In [43]:
print("All done - {} run, elapsed {}, size {} ".format(cached, str(elapsed), str(dfSize) ))

All done - Not cached run, elapsed 164.38950848579407, size 1110.913216 
