In [None]:
import numpy as np
import pandas as pd
from glob import glob
import h5py
import pyarrow.parquet as pq
import pyarrow as pa
import re

from pyspark import SparkContext
from pyspark.sql import SparkSession
from itertools import chain
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [None]:
# Use the 'glob' function to find all files with the extension '.hdf5' 
my_dir = glob('project_data/LH014/hdf5/*hdf5')
# Sort the list of file paths in ascending order.
my_dir = sorted(my_dir)
my_dir

In [None]:
# Calculate the width of each channel by dividing the total bandwidth (19.600) 
# by the number of channels (1024).
freq_range_per_channel = 19.600 / 1024

# Create a dictionary for tune1 where each key is 'channel_X' (X being the channel number from 1 to 1024)
# and each value is the starting frequency of the channel.
# The starting frequency for channel 1 is 20.0, and each subsequent channel's starting frequency
# is incremented by 'freq_range_per_channel'.
channel_dict_tune1 = {'channel_' + str(i+1): (20.0 + i * freq_range_per_channel) for i in range(1024)}

# Create a similar dictionary for tune2 where the starting frequency for channel 1 is 28.0,
# and each subsequent channel's starting frequency is incremented by 'freq_range_per_channel'.
channel_dict_tune2 = {'channel_' + str(i+1): (28.0 + i * freq_range_per_channel) for i in range(1024)}


In [None]:
# np.array(h5py.File('project_data/LH014/hdf5/111.hdf5')['Observation1']['Tuning1']['V'])

In [None]:
# Function to flatten HDF5 data and save it as Parquet files
def flatten_to_parquet(session_number):
    # Construct the filename based on the session number
    filename = 'project_data/LH014/hdf5/' + str(session_number) + '.hdf5'
    
    # Load the 'time' data from the HDF5 file and convert it to a 2D array
    t = np.array(h5py.File(filename)['Observation1']['time'])
    t = t[:, np.newaxis]
    
    # Iterate over the tunings
    for tune in ['Tuning1', 'Tuning2']:
        # Iterate over the polarizations
        for pol in ['I', 'V']:
            
            # Load the data for the current tuning and polarization from the HDF5 file
            data = np.array(h5py.File(filename)['Observation1'][tune][pol])
            # Concatenate the data with the time array along the second axis
            np_arr = np.concatenate((data, t), axis=1)
            
            # Create a dictionary where keys are 'channel_X' and 'time',
            # and values are the corresponding columns of the concatenated array
            my_dict = {'channel_' + str(i+1): np_arr[:, i] for i in range(1024)} | {'time': np_arr[:, -1]}
            # Create a PyArrow table from the dictionary
            pa_table = pa.table(my_dict)
            
            counter = 0
            # Convert the PyArrow table to batches with a maximum chunk size of 50,000
            for batch in pa_table.to_batches(max_chunksize=50_000):
                # Convert the batch to a pandas DataFrame
                df = batch.to_pandas()
                # Transform the DataFrame from wide to long format
                df = df.melt(id_vars=['time'], var_name=['channel'])
                # Add 'tuning' and 'polarization' columns to the DataFrame
                df['tuning'] = tune
                df['polarization'] = pol
                # Map the 'channel' column to the corresponding frequency based on the tuning
                if tune == 'Tuning1':
                    df['frequency'] = df.channel.map(channel_dict_tune1)
                else:
                    df['frequency'] = df.channel.map(channel_dict_tune2)
                # Save the DataFrame as a Parquet file
                df.to_parquet(f'project_data/LH014/parquets/{str(session_number)}_{tune}_{pol}_{counter}.parquet')
                counter += 1
                # Print the current status
                print((tune, pol, counter))


In [None]:
%%time
flatten_to_parquet(241)

In [None]:
# Retrieve a sorted list of all filenames in the 'Project_data/LH014/parquets/' directory
# that match the pattern '*_Tuning1_I_1*'.
done_dir = sorted(glob('Project_data/LH014/parquets/*_Tuning1_I_1*'))

# Define a regular expression pattern to extract the session number from the filename.
pattern_done = r"parquets/(\d+)_Tuning1"

# Use a list comprehension to apply the regular expression pattern to each filename in 'done_dir'.
# Extract the session number (as an integer) from each filename.
done_dir = [int(re.findall(pattern_done, x)[0]) for x in done_dir]

# Remove duplicates by converting the list to a set and then back to a list.
done_dir = list(set(done_dir))
done_dir

In [None]:
# Define a regular expression pattern to extract the session number from the filename.
pattern = r"hdf5\/(\d+)\.hdf5"

# Use a list comprehension to apply the regular expression pattern to each filename in 'my_dir'.
# Extract the session number (as an integer) from each filename.
to_do = [int(re.findall(pattern, x)[0]) for x in my_dir]

# Filter the list 'to_do' to exclude any session numbers that are already in 'done_dir'.
to_do = [x for x in to_do if x not in done_dir]

In [None]:
%%time
for x in to_do:
    print(x)
    flatten_to_parquet(x)

In [None]:
#'spark.executor.memory', '8g'), ('spark.executor.cores', '3'), ('spark.cores.max', '3'), ('spark.driver.memory','8g'
spark = (
    SparkSession
    .builder
    .appName("Your App Name")
    .config("spark.driver.memory", "2g")
    # .config("spark.executor.memory", "3g")
    # .config("spark.executor.cores", "7")
    # .config("spark.executor.memory", "8g")
    
    .getOrCreate())

sc = spark.sparkContext

In [None]:
df = spark.read.parquet('project_data/LH014/parquets/111*.parquet')
df.printSchema()

In [None]:
df.count()

In [None]:
df.show()

In [None]:
df.select('channel').distinct().count()

In [None]:
df.select('frequency').distinct().count()

In [None]:
df.select('polarization').distinct().count()

In [None]:
df.select('tuning').distinct().count()

In [None]:
spark.stop()