# The Extract Transform and Load (ETL) procedure

This notebook will walk you through the process that loads the data from a FITS cube into the Parquet tables on a Hive enables HDFS cluster. 

## Part 2 Load the data into the Parquet table



### New Spark Context

For these processes, we need to change the spark context parameters because we're using this notebook to dynamically insert data into Hive partitioned tables; this means we need to set the hive.exec.dynamic parameters. See the context configuration below

In [2]:
conf = SparkConf().setAll([('spark.executor.memory', '8g'),\
                           ('spark.driver.memory', '6g'),\
                           ('spark.shuffle.service.enabled', True), \
                           ('spark.sql.shuffle.partitions', 144), \
                           ('spark.default.parallelism', 144),\
                           ('spark.dynamicAllocation.enabled', True), \
                           ('spark.dynamicAllocation.executorIdleTimeout', 600), \
                           ('spark.executor.cores', 4),\
                           ('spark.executor.instances', 40), \
                           ('spark.local.dir','/mnt/FITs/Spark/tmp'),\
                           ('spark.jars.packages', 'com.github.astrolabsoftware:spark-fits_2.11:0.9.0'),\
                           ('spark.executor.memoryOverhead', '6g'),\
                           ('spark.driver.memoryOverhead', '4g'),\
                           ('spark.rpc.message.maxSize', '512'),\
                           ('spark.scheduler.mode', 'FAIR'),\
                           ('spark.kryoserializer.buffer.max', '1g'),\
                           ('spark.driver.allowMultipleContexts', True), \
                           ('spark.hadoop.hive.exec.dynamic.partition', True), \
                           ('spark.hadoop.hive.exec.dynamic.partition.mode','nonstrict'), \
                           ('spark.hadoop.hive.exec.max.dynamic.partitions', 100000), \
                           ('spark.hadoop.hive.exec.max.dynamic.partitions.pernode', 100000), \
                           ('spark.app.name','Fits on HDFS')])


In [3]:
if True:

    sc.stop()
    spark.stop()
    SparkSession._instantiatedContext = None

    sc=SparkContext(conf=conf)
    spark=SparkSession(sc)
    sqlContext = SQLContext(sc)

## Library import

In [4]:

import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
from astropy.io import fits
from astropy.table import Table
from astropy.visualization import astropy_mpl_style
from astropy.wcs import WCS
plt.style.use(astropy_mpl_style)

## Let's run a simple object finder on our image,
## and collect the catalog.
import numpy as np
from photutils import DAOStarFinder
from astropy.stats import sigma_clipped_stats

from scipy import sparse
from scipy.sparse import csr_matrix

from random import randint
from time import sleep

from spectral_cube import SpectralCube
from astroquery.esasky import ESASky
from astroquery.utils import TableList
from reproject import reproject_interp
import astropy.units as u

from pyspark.sql import SQLContext, SparkSession, HiveContext
from pyspark import StorageLevel

from pyspark import SparkFiles

from pyspark.sql import functions as F
import pyspark.sql.functions as f

from pyspark.sql.functions import randn, monotonically_increasing_id, row_number, desc, udf, col, lit
from pyspark.sql.functions import broadcast
from pyspark.sql.types import ArrayType, FloatType,IntegerType, DataType, DoubleType,Row, BooleanType

"""
Set the Spark datatypes
"""
iType=IntegerType()
dType=DoubleType()
fType=FloatType()

from pyspark.sql.functions import pandas_udf, PandasUDFType

from pyspark.sql.window import Window

from pyspark.ml.linalg import Vectors
from pyspark.ml.linalg import SparseVector, DenseVector,VectorUDT

from datetime import datetime
import time
import logging
import sys



maindir is /home/hduser/.astropy/config
maindir is /home/hduser/.astropy/config
maindir is /home/hduser/.astropy/cache
maindir is /home/hduser/.astropy/cache
maindir is /home/hduser/.astropy/cache
maindir is /home/hduser/.astropy/cache
maindir is /home/hduser/.astropy/cache
maindir is /home/hduser/.astropy/cache
maindir is /home/hduser/.astropy/config
maindir is /home/hduser/.astropy/config
maindir is /home/hduser/.astropy/cache


#### Classes 

In [8]:
from time import time, clock
class Timer:
    """
    a simple class for printing time (s) since last call
    """
    def __init__(self):
        self.t0=time()
        self.t1=clock()
        self.elapsed = 1
        self.elapsedCPU = 1
    
    def start(self):
        self.t0=time()
        self.t1=clock()
        
    def stop(self):
        t1=time()
        t2=clock()
        print("Elapsed {:2.1f}s, CPU {:2.1f}s".format(t1-self.t0, t2-self.t1))
        self.elapsed = t1-self.t0
        self.elapsedCPU = t2-self.t1

class DataframeError(Exception): 
  
    # Constructor or Initializer 
    def __init__(self, value): 
        self.value = value 
  
    # __str__ is to print() the value 
    def __str__(self): 
        return(repr(self.value)) 

## Functions

The CreateAndSaveArrays function saves the positional data (in this case, Right Ascension, Declination and Frequency) as an array element in a permanent Parquet table. the sparkfits_detail_arrays table is physically partitioned on the filename of the source FITS file and the array detail type (for example, 'RA -- SIN', 'DEC -- SIN ' and a spectra trype 'Hz', 'GHz' or 'km s-1'.

In [9]:
def CreateAndSaveArrays(array, filename, arrayCoordinates, cubeSpectraUnits=None ):
    if cubeSpectraUnits == None:
        sda_detail_name=arrayCoordinates
    else:
        sda_detail_name=cubeSpectraUnits

    #logger.in(filename, sda_detail_name )

    arrayRDD=sc.parallelize(array).zipWithIndex().map(lambda x :(x[1], x[0]) )
    arrayDF=arrayRDD.map(lambda x: (x[0], x[1].tolist()) ).toDF(["index", "spectra"])

    #"""
    #
    # Maintaining order within a collect_list, See
    # https://stackoverflow.com/questions/50766764/sorting-within-collect-list-in-hive
    #
    #"""

    arrayDF.registerTempTable("SPECTRAS")

    sqlStmt="""
    with myData as
    (
        select 1 as grp,
        index as idx,
        map(
            'spectraVal', spectra
        ) as kv
        from SPECTRAS
        distribute by grp
        sort by grp, index
    )
    select collect_list( float(a.kv['spectraVal']) ) as sda_detail_array 
    from myData a
    group by grp
    """

    spectraArrayDF=spark.sql(sqlStmt)
    spectraArrayDF=spectraArrayDF.select("sda_detail_array"\
                                         ,lit(filename).alias("sda_filename")\
                                         ,lit(sda_detail_name).alias("sda_detail_type"))

    spectraArrayDF.registerTempTable("ARRAYS")
    spark.sql("""
    insert overwrite table fits_investigation.sparkfits_detail_arrays 
    partition(sda_filename , sda_detail_type) 
    select sda_detail_array, sda_filename, sda_detail_type
    from ARRAYS
    """)



In [42]:
def GetDimensions(cube, fileName, cubeDecCoordinate, cubeSpectraUnits, cubeDecDim):
    
    _, dec, _ = cube.world[0, :, 0]  #extract latitude world coordinates from cube dec
    _, _, ra = cube.world[0, 0, :]  #extract longitude world coordinates from cube ra

    """
    Make sure that we cast the Ra and Dec arrays explicitly to float32
    """
    ra=np.array(ra, dtype=np.float32)
    # and flip - always want declination ordered from greatest to lowest. eg -18 ..... -27 or 43 ..... 21
    #if max(dec) <0 or 1==1:
    
    dec=np.flip( np.array(dec, dtype=np.float32) )
    
    #else:
    #    dec=np.array(dec, dtype=np.float32)
    
    """
    
    Get the Spectral Axis data.
    Note: Further enhancements can include changing the values of the Spectral axis - ie change from Hz to redshift or
    velocity.
    
    """
    spectralAxis=cube.spectral_axis
    
    """
    Cast the SpectralCube explicitily to a numpy array, float 32
    """
    
    specArray=np.array(spectralAxis , dtype=np.float32)
    
    print("arrays created")

    return ra, dec, specArray

Extract and save the source FITS file header data to a Parquet table

In [10]:
def CreateFITSHeaderData(header, fitsFilename):
    hduArray=np.empty((0, 4))
    Results=Row("fits_file","index","key","value","comment")
    for i in np.arange(len(header)):
        theKey=list(header.keys())[int(i)]
        theValue=header[int(i)]
        theComment=header.comments[int(i)]

        #print(fitsFilename, theKey, theValue, theComment)
        if theKey and not theKey.isspace():
            result=Results(fitsFilename, int(i), theKey, str(theValue), header.comments[int(i)])

            if i == 0:
                hduDF=spark.createDataFrame([result])
            else:
                newRow=spark.createDataFrame([result])
                hduDF = hduDF.union(newRow)

        else:
            pass
    
    hduDF.printSchema()
    hduDF.registerTempTable("HEADERS")
    
    spark.sql("""
    insert overwrite table fits_investigation.sparkfits_fits_headers partition(sfh_fits_file ) 
    select index, key, value, comment, fits_file from HEADERS
    """)

    print('Header data created')

In [11]:
def rowdf_into_imagerdd(df, final_num_partition=1):
    """
    Reshape a DataFrame of rows into a RDD containing the full image
    in one partition.

    Parameters
    ----------
    df : DataFrame
        DataFrame of image rows.
    final_num_partition : Int
        The final number of partitions. Must be one (default) unless you
        know what you are doing.

    Returns
    ----------
    imageRDD : RDD
        RDD containing the full image in one partition
        
    NOTE: With larger FITS files, we need to avoid the use of .glom() as it results in larger memort usage.
        From the documentation -
        .glom(self) - Return an RDD created by coalescing all elements within each partition into a list.
        
    return df.rdd.zipWithIndex().coalesce(final_num_partition).glom()    
    """

    return df.rdd.zipWithIndex() #.coalesce(final_num_partition).glom()

def clean_image(im):
    """
    Cleans out the Nan elements in the array
    """
    #im=im[~np.isnan(im).any(axis=1)]
    im = np.nan_to_num(im)
    return im


def replace_nan_with_float(im):
    """
    Replaces Nan elements with a random float
    """
    
    #nan_mask = np.isnan( np.array(im, dtype=np.float32) )
    nan_mask = np.isnan( np.array(im) )
    
    sampl = np.random.uniform(low=0.01, high=13.3, size=(im.shape))
    
    im[nan_mask] = sampl[nan_mask]
    
    #random_replace = np.vectorize(lambda x: np.random.random() if np.isnan(x) else x)
    #im=random_replace(im)
    return im
    

def calculateBand(x, bandCount):
    x = x // bandCount
    return x

def CastStructToArray(StructCol):
    imageArray = np.asarray(StructCol[0]).tolist()
    return imageArray

def calculateBand(x, bandCount=32):
    x = x  // bandCount
    return x

def dummyReturn(x):
    return x.tolist()

def returnAsIs(x):
    return x

# ====================================================
# Write results to Parquet table
# ====================================================

def writeResults(sc, resultDF, vMode, vFormat, vTable):
    
    resultDF.write.mode(vMode).format(vFormat).saveAsTable(vTable)
    
def GetCubePositionData(cube, fileName, cubeDecCoordinate, cubeSpectraUnits, cubeDecDim):
    
    _, dec, _ = cube.world[0, :, 0]  #extract latitude world coordinates from cube dec
    _, _, ra = cube.world[0, 0, :]  #extract longitude world coordinates from cube ra

    """
    Make sure that we cast the Ra and Dec arrays explicitly to float32
    """
    ra=np.array(ra, dtype=np.float32)
    # and flip - always want declination ordered from greatest to lowest. eg -18 ..... -27 or 43 ..... 21
    #if max(dec) <0 or 1==1:
    
    dec=np.flip( np.array(dec, dtype=np.float32) )
    
    #else:
    #    dec=np.array(dec, dtype=np.float32)
    
    """
    
    Get the Spectral Axis data.
    Note: Further enhancements can include changing the values of the Spectral axis - ie change from Hz to redshift or
    velocity.
    
    """
    spectralAxis=cube.spectral_axis
    
    """
    Cast the SpectralCube explicitily to a numpy array, float 32
    """
    
    specArray=np.array(spectralAxis , dtype=np.float32)
    
    print("arrays created")
    """ parallelize the arrays 
    
    NOTE: 
    As spark does not necessarily maintain the order of the values in a join, we need to specifically define the ordering
    in order to maintain the correct relationships between the position and spectra arrays we extract from te FITS cube via 
    SpectralCube during the population process.
    
    To do this, we create a parallelised array for the spectra and declinatino with a zip index. the cartesian join is then sorted 
    spectra and declination indexes, and then the index is created for the cartesian RDD.
    
    This ensures the proper sorting and ordering of the combined data.
    
    Note that this is a potentially an expensive computational operation on larget arrays, but as it is only a one-off
    computation per file, this is acceptable.
    """
    rdd1=sc.parallelize(specArray).zipWithIndex()
    rdd2=sc.parallelize(dec).zipWithIndex()
    
    print("and parallelized")
    
    """ Ordered cartesian join as a basis of the dataframe """
    fDecDF=rdd1.cartesian(rdd2)\
    .sortBy(lambda x:(x[0][1], x[1][1]), True)\
    .zipWithIndex()\
    .map(lambda x: (x[0][0][0].tolist(), x[0][1][0].tolist(), x[1]))\
    .toDF(["spd_spectra_value","spd_declination", "spd_index"])\
    .select("spd_index", \
                castToFloat("spd_spectra_value").alias("spd_spectra_value"), \
                castToFloat("spd_declination").alias("spd_declination"), \
                lit(fileName).alias("spd_filename"), \
                lit(cubeDecCoordinate).alias("spd_position_type"), \
                lit(cubeSpectraUnits).alias("spd_spectra_type"), \
                GetBandUDF("spd_index", lit(cubeDecDim)).alias("spd_band")  )                
    print("dataframe created")
    return ra, dec, specArray, fDecDF


## UDF Wrapper functions

In [12]:
GetBandUDF = udf(lambda arr, bw: calculateBand(np.int(arr), bw), iType)
CastToInt = udf(lambda x: dummyReturn(x), iType)
CastToFloatArrayUDF = udf(lambda x : CastStructToArray(x), ArrayType(fType))

castToFloat = udf(lambda arr: returnAsIs(arr), fType)


Next two cells are a chack your connextino through to your Hive database are correctly configured

In [13]:
sqlContext.sql("use fits_investigation")

DataFrame[]

In [14]:
sqlContext.sql("show tables").show()

+------------------+--------------------+-----------+
|          database|           tableName|isTemporary|
+------------------+--------------------+-----------+
|fits_investigation|          array_test|      false|
|fits_investigation|declimation_dim_temp|      false|
|fits_investigation|    fits_header_data|      false|
|fits_investigation|          float_test|      false|
|fits_investigation|  imagetestsparkfits|      false|
|fits_investigation|   localparalleltest|      false|
|fits_investigation|localparalleltest...|      false|
|fits_investigation|localparalleltest...|      false|
|fits_investigation|localparalleltest...|      false|
|fits_investigation|  newdfdeletemelater|      false|
|fits_investigation|     parquetdataview|      false|
|fits_investigation|parquetpartitione...|      false|
|fits_investigation|parquetpartitione...|      false|
|fits_investigation|parquetpartitione...|      false|
|fits_investigation|parquetpartitione...|      false|
|fits_investigation|parquetp

## Identify the file

At this stage, we still need access to the source FITS file on the local filesystem (or S3) as we'll be using the SpectralCube library to extract the position  and spectra arrays.

We'll use the same file we used in Part 1.

Obviously, in production ETL flows, these processes would be automated.

In [20]:
FileName='image.restored.i.SB2338.V2.cube.fits'
localFile='/mnt1/FITs/' + FileName
hdfsFilePath='hdfs:///user/hduser/FITS_Files/image.restored.i.SB2338.V2.cube.fits' #+FileName

And create the sparkfits source file address (was target File in Part1)

In [16]:
sourceFile=hdfsFilePath.replace('.fits', '2.parquet')
sourceFile

'hdfs:///user/hduser/FITS_Files/image.restored.i.SB2338.V2.cube2.parquet'

Create the dataframe 

In [23]:
timer=Timer()
hdu=0
df_parquet = spark.read.format('parquet').load(sourceFile)
fits.info("/mnt1/FITs/image.restored.i.SB2338.V2.cube.fits")
hdu1=fits.open("/mnt1/FITs/image.restored.i.SB2338.V2.cube.fits" , memmap=True)
cubeHeader=hdu1[0].header
cubeWCS=WCS(cubeHeader)
cubeHz = SpectralCube.read(hdu1)

hdu1.close()
PartitionFileName = FileName

Filename: /mnt1/FITs/image.restored.i.SB2338.V2.cube.fits
No.    Name      Ver    Type      Cards   Dimensions   Format
  0  PRIMARY       1 PrimaryHDU      77   (5607, 5654, 1, 2592)   float32   




Save the header data

In [24]:
CreateFITSHeaderData(cubeHeader, PartitionFileName)

root
 |-- fits_file: string (nullable = true)
 |-- index: long (nullable = true)
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- comment: string (nullable = true)

Header data created


In [25]:
PartitionFileName

'image.restored.i.SB2338.V2.cube.fits'

In [26]:
polarisation=cubeHeader["NAXIS3"] # Polarisation 196608
raLength=cubeHz.header["NAXIS1"] # RA
decLength=cubeHz.header["NAXIS2"] # DEC
spectraLength=cubeHz.header["NAXIS3"] # Spectra (hx) Pixels 

numPolarisation=cubeHeader["NAXIS4"]

## Same as dec length, used in the GetBandUDF UDF functionsto calculate the polarisation and spectral band indexes.
## See
##        GetBandUDF(col("spi_index"), lit(frequencyBandLength*spectraLength)).alias("spi_pol"),\
##        GetBandUDF(col("spi_index"), lit(frequencyBandLength)).alias("spi_band") )\


frequencyBandLength=cubeHz.header["NAXIS2"] 


In [27]:
polarisation

1

Get the Sparkfits dataframe, and then populate the images table with the dataframe

In [28]:
timer.start()
print("Get column names")
cols = df_parquet.columns

# Add the .zipWithIndex column
print("Create the dataframe with the index we'll use for declination")
df = df_parquet.rdd.zipWithIndex().map(lambda row: (row[1],) + tuple(row[0])).toDF(["index"] + cols)
df.printSchema()
print("Dataframe created")

# Register the dataframe as a temporary table
df.registerTempTable('RawDataWithIndex')
print("Temporary raw data table created")



timer.stop()

Get column names
Create the dataframe with the index we'll use for declination
root
 |-- index: long (nullable = true)
 |-- Image: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- ImgIndex: long (nullable = true)

Dataframe created
Temporary raw data table created
Elapsed 281.5s, CPU 0.5s


#### Now we have to reset the raw data indexes

Change the index values from 0 - <number of declinatin values within one channel>. In the case of CUBE1, 0 - 2592.
    
We do this by creating a temporary table containing the smallest index value for a specific image channel, which is identified by the ImgIndex field in the dataframe. The next cell demonstrates this

In [29]:
sqlContext.sql("""
select ImgIndex, min(index) as loIdx 
from RawDataWithIndex 
group by ImgIndex
""").registerTempTable("CORRECTOR")

sqlContext.sql("select * from CORRECTOR").show()

+--------+--------+
|ImgIndex|   loIdx|
+--------+--------+
|    1126| 5099356|
|    2085| 6331880|
|     347|  597934|
|     416|13693377|
|     906| 8507025|
|    1779| 5690979|
|     295| 7861176|
|    2529|   73502|
|    2305| 5707941|
|     401|11980534|
|    1589| 9165714|
|     566|13089798|
|     406| 1272711|
|    1794| 5796771|
|    2077| 2011673|
|    1840|  134393|
|     789| 3506066|
|    2374| 9816598|
|      49| 8562263|
|     722|10381261|
+--------+--------+
only showing top 20 rows



Incorporating the index adjustments. We'll do a Broadcast join on the Corrector table and create the new FITS file as temporary table "NewImages"

In [30]:
sqlContext.sql("""
    select /*+ BROADCAST(CORRECTOR) */
        a.index,a.image,'{}' as filename, a.ImgIndex as band, b.loIdx,
        (a.index-b.loIdx) as new_index
    from RawDataWithIndex a
        inner join CORRECTOR b
            on a.ImgIndex=b.ImgIndex
""".format(PartitionFileName, PartitionFileName)).explain()

== Physical Plan ==
*(5) Project [index#1189L, image#1190, image.restored.i.SB2338.V2.cube.fits AS filename#1211, ImgIndex#1191L AS band#1212L, loIdx#1195L, (index#1189L - loIdx#1195L) AS new_index#1213L]
+- *(5) SortMergeJoin [ImgIndex#1191L], [ImgIndex#1216L], Inner
   :- *(2) Sort [ImgIndex#1191L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(ImgIndex#1191L, 144)
   :     +- *(1) Filter isnotnull(ImgIndex#1191L)
   :        +- Scan ExistingRDD[index#1189L,Image#1190,ImgIndex#1191L]
   +- *(4) Sort [ImgIndex#1216L ASC NULLS FIRST], false, 0
      +- *(4) HashAggregate(keys=[ImgIndex#1216L], functions=[min(index#1214L)])
         +- Exchange hashpartitioning(ImgIndex#1216L, 144)
            +- *(3) HashAggregate(keys=[ImgIndex#1216L], functions=[partial_min(index#1214L)])
               +- *(3) Project [index#1214L, ImgIndex#1216L]
                  +- *(3) Filter isnotnull(ImgIndex#1216L)
                     +- Scan ExistingRDD[index#1214L,Image#1215,ImgIndex#1216L]


In [31]:
timer.start()
sqlContext.sql("""
    select /*+ BROADCAST(CORRECTOR) */
        a.index,a.image,'{}' as filename, a.ImgIndex as band, b.loIdx,
        (a.index-b.loIdx) as new_index
    from RawDataWithIndex a
        inner join CORRECTOR b
            on a.ImgIndex=b.ImgIndex
""".format(PartitionFileName, PartitionFileName)).registerTempTable("NewImages")
timer.stop()

print("New images temp table created!")

sqlContext.sql("select * from NewImages").show()

Elapsed 0.0s, CPU 0.0s
New images temp table created!
+-------+--------------------+--------------------+----+-------+---------+
|  index|               image|            filename|band|  loIdx|new_index|
+-------+--------------------+--------------------+----+-------+---------+
|7861176|[NaN, NaN, NaN, N...|image.restored.i....| 295|7861176|        0|
|7861177|[NaN, NaN, NaN, N...|image.restored.i....| 295|7861176|        1|
|7861178|[NaN, NaN, NaN, N...|image.restored.i....| 295|7861176|        2|
|7861179|[NaN, NaN, NaN, N...|image.restored.i....| 295|7861176|        3|
|7861180|[NaN, NaN, NaN, N...|image.restored.i....| 295|7861176|        4|
|7861181|[NaN, NaN, NaN, N...|image.restored.i....| 295|7861176|        5|
|7861182|[NaN, NaN, NaN, N...|image.restored.i....| 295|7861176|        6|
|7861183|[NaN, NaN, NaN, N...|image.restored.i....| 295|7861176|        7|
|7861184|[NaN, NaN, NaN, N...|image.restored.i....| 295|7861176|        8|
|7861185|[NaN, NaN, NaN, N...|image.restored.i

And writing the dataframe to the final parquet table

In [34]:
timer.start()
print("Writing to Parquet...")
sqlContext.sql("""
insert overwrite table sparkfits_images partition(spi_filename, spi_band )
select new_index, image, '{}', band from NewImages
""".format(PartitionFileName))
print("Done!")
timer.stop()

Writing to Parquet...
Done!
Elapsed 1945.1s, CPU 1.3s


## Create the dimensions and populate the dimension table

First, we create the cubes with the alternative positions and/or spectra values we're interested in.

In this example, we create two new SpectralCubes with spectral units of GHz and velocity in km s-1

In [35]:
timer.start()
print("Create the alternative spectral cubes for the differing spectral arrays")
cubeVel = cubeHz.with_spectral_unit(u.km / u.s, velocity_convention='radio' ,rest_value=200 * u.Hz)
cubeGHz = cubeHz.with_spectral_unit(u.GHz, velocity_convention='radio' ,rest_value=200 * u.GHz)
print("Alternative cubes created")
timer.stop()

Create the alternative spectral cubes for the differing spectral arrays
Alternative cubes created
Elapsed 0.1s, CPU 0.1s


Then we extract the data and write the dimensions to the dimension table

In [43]:
for i, c in enumerate([cubeHz]):
    timer.start()
    print(c.header["ORIGIN"])

    cubePixelValue = c.header["BUNIT"]
    cubeRaDim = c.header["NAXIS1"]
    cubeDecDim = c.header["NAXIS2"]
    cubeSpectraDim = c.header["NAXIS3"]
    cubeRACoordinate = c.header["CTYPE1"]
    cubeDecCoordinate = c.header["CTYPE2"]
    cubeSpectraCoordinate = c.header["CTYPE3"]
    cubeRAUnits = c.header["CUNIT1"]
    cubeDecUnits = c.header["CUNIT2"]
    cubeSpectraUnits = c.header["CUNIT3"]
    print(cubePixelValue, cubeSpectraCoordinate, cubeSpectraUnits)
    timer.start()

    ra, dec, specArray = GetDimensions(c, PartitionFileName, cubeDecCoordinate, cubeSpectraUnits, cubeDecDim)
    #ra, dec, specArray, DecDF = FastGetCubePositions(c, PartitionFileName, cubeDecCoordinate, cubeSpectraUnits, cubeDecDim)


    if i == 0:
        print("Creating ra and dec detail arrays")
        CreateAndSaveArrays(ra,  PartitionFileName, cubeRACoordinate)
        CreateAndSaveArrays(dec, PartitionFileName, cubeDecCoordinate)

    print("Creating the spectral detail array")
    CreateAndSaveArrays(specArray, PartitionFileName, cubeSpectraCoordinate, cubeSpectraUnits) 
    timer.stop()

ASKAPSoft
beam-1 Jy FREQ Hz
arrays created
Creating ra and dec detail arrays
Creating the spectral detail array
Elapsed 32.2s, CPU 0.2s


In [39]:
print("All done and dusted!")

All done and dusted!
