In [7]:
import fog_x 
import os
from logging import getLogger
import numpy as np
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # or any {'0', '1', '2'}
import tensorflow as tf

class BaseLoader():
    def __init__(self, data_path):
        super(BaseLoader, self).__init__()
        self.data_dir = data_path
        self.logger = getLogger(__name__)
        self.index = 0


    def __len__(self):
        raise NotImplementedError

    def __iter___(self):
        raise NotImplementedError

class BaseExporter():
    def __init__(self):
        super(BaseExporter, self).__init__()
        self.logger = getLogger(__name__)

    def export(self, loader: BaseLoader, output_path: str):
        raise NotImplementedError
    

In [8]:
class RTXLoader(BaseLoader):
    def __init__(self, data_path, split):
        super(RTXLoader, self).__init__(data_path)
        import tensorflow_datasets as tfds

        builder = tfds.builder_from_directory(data_path)

        self.ds = builder.as_dataset(split=split)
        # https://www.determined.ai/blog/tf-dataset-the-bad-parts

    def __len__(self):
        return len(self.ds)
    
    def __next__(self):
        if self.index < len(self.ds):
            self.index += 1
            nested_dataset = self.ds.__iter__()
            trajectory = list(nested_dataset)[0]["steps"]
            ret = []
            # Iterate through the outer dataset
            for step_data in trajectory:
                step = {}
                for dataset_key, element in step_data.items():
                    # print(np.array(element))
                    if dataset_key == "observation":
                        step["observation"] = {}
                        for obs_key, obs_element in element.items():
                            step["observation"][obs_key] = np.array(obs_element)
                    elif dataset_key == "action":
                        step["action"] = {}
                        for action_key, action_element in element.items():
                            step["action"][action_key] = np.array(action_element)
                    else:
                        step[dataset_key] = np.array(element)
                ret.append(step)
            return ret
        else:
            raise StopIteration
    

    
    def __iter__(self):
        return self


class RTXExporter(BaseExporter):
    def __init__(self):
        super(RTXExporter, self).__init__()

    def export(self, loader: BaseLoader, output_path: str):
        raise NotImplementedError

In [9]:
import av
import pickle 

class MKVLoader(BaseLoader):
    def __init__(self, data_path):
        super(MKVLoader, self).__init__(data_path)
        self.files = [data_path + f for f in os.listdir(data_path) if f.endswith('.mkv')]
        self.index = 0

    def __len__(self):
        return len(self.files)
    
    def __iter__(self):
        return self

    def __next__(self):
        if self.index < len(self):
            result = self.files[self.index]
            self.index += 1
            return self._parse_mkv_file(result)
        else:
            raise StopIteration
    
    def _parse_mkv_file(self, filename):
        print(filename)
        input_container = av.open(filename)
        video_stream1 = input_container.streams.video[0] 
        video_stream1.thread_type = 'AUTO'
        video_stream2 = input_container.streams.video[1] 
        video_stream2.thread_type = 'AUTO'
        depth_stream = input_container.streams.video[2] 
        depth_stream.thread_type = 'AUTO'
        data_stream = input_container.streams[3] 

        decoded_stream_1 = []
        decoded_stream_2 = []
        decoded_stream_depth = []
        decoded_stream_data = []

        pkt_counter = 0
        for packet in input_container.demux(video_stream1, video_stream2, depth_stream, data_stream):
            pkt_counter += 1
            if packet.stream.index == video_stream1.index: 
                frame = packet.decode()
                if frame:
                    for f in frame:
                        image = f.to_ndarray(format='rgb24')
                        decoded_stream_1.append(image)
            elif packet.stream.index == video_stream2.index:
                frame = packet.decode()
                if frame:
                    for f in frame:
                        image = f.to_ndarray(format='rgb24')
                        decoded_stream_2.append(image)
            elif packet.stream.index == depth_stream.index:
                frame = packet.decode()
                if frame:
                    for f in frame:
                        image = f.to_ndarray(format='gray')
                        decoded_stream_depth.append(image)
            elif packet.stream.index == data_stream.index:
                packet_in_bytes = bytes(packet)
                if packet_in_bytes:
                    non_dict = pickle.loads(packet_in_bytes)
                    decoded_stream_data.append(non_dict)
            else:
                print("Unknown stream")
        print(pkt_counter, len(decoded_stream_1), len(decoded_stream_2), len(decoded_stream_depth), len(decoded_stream_data))
        input_container.close()

        

class MKVExporter(BaseExporter):
    def __init__(self):
        super(MKVExporter, self).__init__()

    # Function to create a frame from numpy array
    def create_frame(self, image_array, stream):
        frame = av.VideoFrame.from_ndarray(np.array(image_array), format='rgb24')
        frame.pict_type = 'NONE'
        frame.time_base = stream.time_base
        return frame
    
    # Function to create a frame from numpy array
    def create_frame_depth(self, image_array, stream):
        image_array = np.array(image_array)
        # if float, convert to uint8
        if image_array.dtype == np.float32:
            image_array = (image_array * 255).astype(np.uint8)
        # if 3 dim, convert to 2 dim
        if len(image_array.shape) == 3:
            image_array = image_array[:,:,0]
        frame = av.VideoFrame.from_ndarray(image_array, format='gray')
        frame.pict_type = 'NONE'
        frame.time_base = stream.time_base
        return frame

    def export(self, loader: BaseLoader, output_path: str):
        # Create an output container
        i = -1
        for trajectory in loader:
            i += 1
            output = av.open(f'{output_path}/output_{i}.mkv', mode='w')
            print(f'{output_path}/output_{i}.mkv')
            # Define video streams (assuming images are 640x480 RGB)
            video_stream_1 = output.add_stream('libx264', rate=1)
            video_stream_1.width = 640
            video_stream_1.height = 480
            video_stream_1.pix_fmt = 'yuv420p'

            video_stream_2 = output.add_stream('libx264', rate=1)
            video_stream_2.width = 640
            video_stream_2.height = 480
            video_stream_2.pix_fmt = 'yuv420p'

            # Define custom data stream for vectors
            depth_stream = output.add_stream('libx264', rate=1)

            data_stream = output.add_stream('rawvideo', rate=1)

            ts = 0
            # convert step data to stream
            for step in trajectory:
                obesrvation = step["observation"].copy()
                obesrvation.pop("image")
                obesrvation.pop("hand_image")
                obesrvation.pop("image_with_depth")
                non_image_data_step = step.copy()
                non_image_data_step["observation"] = obesrvation

                non_image_data_bytes = pickle.dumps(non_image_data_step)
                packet = av.Packet(non_image_data_bytes)
                packet.stream = data_stream
                packet.pts = ts
                output.mux(packet)

                image =np.array(step["observation"]["image"])
                # Create a frame from the numpy array
                frame = self.create_frame(image, video_stream_1)
                frame.pts = ts
                packet = video_stream_1.encode(frame)
                
                output.mux(packet)

                hand_image =np.array(step["observation"]["hand_image"])
                # Create a frame from the numpy array
                frame = self.create_frame(hand_image, video_stream_2)
                frame.pts = ts
                packet = video_stream_2.encode(frame)
                output.mux(packet)

                # # Create a frame from the numpy array
                frame = self.create_frame_depth(step["observation"]["image_with_depth"], depth_stream)
                # frame.pts = ts
                # Encode the frame
                packet = depth_stream.encode(frame)
                # Write the packet to the output file
                output.mux(packet)

                ts += 1

            
            # Flush the remaining frames
            for packet in video_stream_1.encode():
                output.mux(packet)

            for packet in video_stream_2.encode():
                output.mux(packet)

            for packet in depth_stream.encode():
                output.mux(packet)
            output.close()



In [10]:
number_of_samples = 1
def iterate_dataset(loader: BaseLoader, number_of_samples):
    for i, data in enumerate(loader): 
        list(dict(data)["steps"])
        if i == number_of_samples:
            break

In [15]:
import os
import pickle
import pandas as pd
import fastparquet as fp
import numpy as np
from logging import getLogger
import fog_x


class ParquetExporter(BaseExporter):
    def __init__(self):
        super(ParquetExporter, self).__init__()
        self.logger = getLogger(__name__)

    def _serialize(self, data):
        return pickle.dumps(np.array(data))

    def _step_data_to_df(self, step_data):
        step = {}
        for dataset_key, element in step_data.items():
            if dataset_key == "observation":
                for obs_key, obs_element in element.items():
                    step[obs_key] = self._serialize(obs_element)
            elif dataset_key == "action":
                for action_key, action_element in element.items():
                    step[action_key] = self._serialize(action_element)
            else:
                step[dataset_key] = self._serialize(element)
        return step

    def export(self, loader: BaseLoader, output_path: str):
        if not os.path.exists(output_path):
            os.makedirs(output_path)
        i = 0
        for steps_data in loader:
            step_data_as_df = [self._step_data_to_df(step_data) for step_data in steps_data]
            combined_df = pd.DataFrame(step_data_as_df)
            output_file = os.path.join(output_path, f'output_{i}.parquet')
            fp.write(output_file, combined_df)
            print(f'Exported {output_file}')
            i += 1


class ParquetLoader(BaseLoader):
    def __init__(self, data_path):
        super(ParquetLoader, self).__init__(data_path)
        self.files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith('.parquet')]
        self.index = 0

    def __len__(self):
        return len(self.files)
    
    def __iter__(self):
        return self

    def __next__(self):
        if self.index < len(self.files):
            result = self._load_file(self.files[self.index])
            self.index += 1
            return result
        else:
            raise StopIteration

    def _parse_parquet_file(self, filename):
        df = fp.ParquetFile(filename).to_pandas()
        steps = [dict(row) for _, row in df.iterrows()]
        return steps

number_of_samples = 1

rtx_loader = RTXLoader(os.path.expanduser("~/datasets/berkeley_autolab_ur5/0.1.0"), split=f'train[:{number_of_samples}]')

output_path = "/home/davidh/fog_x_fork/examples/dataloader/pq_output"
os.makedirs(output_path, exist_ok=True)

exporter = ParquetExporter()
exporter.export(rtx_loader, output_path)

parquet_loader = ParquetLoader(output_path)

I 2024-06-23 06:05:40,410 dataset_info.py:595] Load dataset info from /home/davidh/datasets/berkeley_autolab_ur5/0.1.0


I 2024-06-23 06:05:40,525 logging_logger.py:49] Constructing tf.data.Dataset berkeley_autolab_ur5 for split train[:1], from /home/davidh/datasets/berkeley_autolab_ur5/0.1.0


Exported /home/davidh/fog_x_fork/examples/dataloader/pq_output/output_0.parquet


In [16]:
import os
import pickle
import pandas as pd
import pyarrow.feather as feather
import numpy as np
from logging import getLogger

class FeatherExporter(BaseExporter):
    def __init__(self):
        super(FeatherExporter, self).__init__()
        self.logger = getLogger(__name__)

    def _serialize(self, data):
        return pickle.dumps(np.array(data))

    def _step_data_to_df(self, step_data):
        step = {}
        for dataset_key, element in step_data.items():
            if dataset_key == "observation":
                for obs_key, obs_element in element.items():
                    step[obs_key] = self._serialize(obs_element)
            elif dataset_key == "action":
                for action_key, action_element in element.items():
                    step[action_key] = self._serialize(action_element)
            else:
                step[dataset_key] = self._serialize(element)
        return step

    def export(self, loader: BaseLoader, output_path: str):
        if not os.path.exists(output_path):
            os.makedirs(output_path)
        i = 0
        for steps_data in loader:
            step_data_as_df = [self._step_data_to_df(step_data) for step_data in steps_data]
            combined_df = pd.DataFrame(step_data_as_df)
            output_file = os.path.join(output_path, f'output_{i}.feather')
            feather.write_feather(combined_df, output_file)
            print(f'Exported {output_file}')
            i += 1
import os
import pickle
import pandas as pd
import pyarrow.feather as feather
import numpy as np
from logging import getLogger

class FeatherLoader(BaseLoader):
    def __init__(self, data_path):
        super(FeatherLoader, self).__init__(data_path)
        self.files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith('.feather')]
        self.index = 0

    def __len__(self):
        return len(self.files)
    
    def __iter__(self):
        return self

    def __next__(self):
        if self.index < len(self.files):
            result = self._load_file(self.files[self.index])
            self.index += 1
            return result
        else:
            raise StopIteration

    def _load_file(self, filename):
        df = feather.read_feather(filename)
        steps = [dict(row) for _, row in df.iterrows()]
        return steps
import os
from fog_x import RTXLoader

number_of_samples = 1

rtx_loader = RTXLoader(os.path.expanduser("~/datasets/berkeley_autolab_ur5/0.1.0"), split=f'train[:{number_of_samples}]')

output_path = os.path.expanduser("~") + "/fog_x/examples/dataloader/feather_output/"
os.makedirs(output_path, exist_ok=True)
exporter = FeatherExporter()
exporter.export(rtx_loader, output_path)

feather_loader = FeatherLoader(output_path)


ImportError: cannot import name 'BaseExporter' from 'fog_x' (/home/davidh/miniconda3/envs/fog_x_env/lib/python3.8/site-packages/fog_x/__init__.py)

In [17]:
os.path.getsize("/home/davidh/fog_x_fork/examples/dataloader/pq_output/output_0.parquet")

218424901

In [20]:
import os
import pickle
import pandas as pd
import pyarrow.feather as feather
import numpy as np
from logging import getLogger


class FeatherExporter(BaseExporter):
    def __init__(self):
        super(FeatherExporter, self).__init__()
        self.logger = getLogger(__name__)

    def _serialize(self, data):
        return pickle.dumps(np.array(data))

    def _step_data_to_df(self, step_data):
        step = {}
        for dataset_key, element in step_data.items():
            if dataset_key == "observation":
                for obs_key, obs_element in element.items():
                    step[obs_key] = self._serialize(obs_element)
            elif dataset_key == "action":
                for action_key, action_element in element.items():
                    step[action_key] = self._serialize(action_element)
            else:
                step[dataset_key] = self._serialize(element)
        return step

    def export(self, loader: BaseLoader, output_path: str):
        if not os.path.exists(output_path):
            os.makedirs(output_path)
        i = 0
        for steps_data in loader:
            step_data_as_df = [self._step_data_to_df(step_data) for step_data in steps_data]
            combined_df = pd.DataFrame(step_data_as_df)
            output_file = os.path.join(output_path, f'output_{i}.feather')
            feather.write_feather(combined_df, output_file)
            print(f'Exported {output_file}')
            i += 1

class FeatherLoader(BaseLoader):
    def __init__(self, data_path):
        super(FeatherLoader, self).__init__(data_path)
        self.files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith('.feather')]
        self.index = 0

    def __len__(self):
        return len(self.files)
    
    def __iter__(self):
        return self

    def __next__(self):
        if self.index < len(self.files):
            result = self._load_file(self.files[self.index])
            self.index += 1
            return result
        else:
            raise StopIteration

    def _load_file(self, filename):
        df = feather.read_feather(filename)
        steps = [dict(row) for _, row in df.iterrows()]
        return steps

number_of_samples = 1

rtx_loader = RTXLoader(os.path.expanduser("~/datasets/berkeley_autolab_ur5/0.1.0"), split=f'train[:{number_of_samples}]')

output_path = os.path.expanduser("~") + "/fog_x/examples/dataloader/feather_output/"
os.makedirs(output_path, exist_ok=True)

exporter = FeatherExporter()
exporter.export(rtx_loader, output_path)

feather_loader = FeatherLoader(output_path)



I 2024-06-23 06:09:07,749 dataset_info.py:595] Load dataset info from /home/davidh/datasets/berkeley_autolab_ur5/0.1.0
I 2024-06-23 06:09:07,996 logging_logger.py:49] Constructing tf.data.Dataset berkeley_autolab_ur5 for split train[:1], from /home/davidh/datasets/berkeley_autolab_ur5/0.1.0


Exported /home/davidh/fog_x/examples/dataloader/feather_output/output_0.feather


In [21]:
os.path.getsize("/home/davidh/fog_x/examples/dataloader/feather_output/output_0.feather")

91502858

In [None]:

class ParquetLoader():
    def __init__(self, data_path):
        super(ParquetLoader, self).__init__()
        self.data_dir = data_path
        self.logger = getLogger(__name__)

    def __len__(self):
        raise NotImplementedError

    def __iter___(self):
        raise NotImplementedError


In [7]:

exporter = MKVExporter()
output_path = "/home/davidh/fog_x_fork/examples/dataloader/mkv_output/"
exporter.export(rtx_loader, output_path)

In [8]:
mkv_loader = MKVLoader(output_path)
def iterate_dataset(loader: BaseLoader, number_of_samples = 50):
    for i, data in enumerate(loader): 
        if i == number_of_samples:
            break
iterate_dataset(mkv_loader, number_of_samples)

/home/davidh/fog_x_fork/examples/dataloader/mkv_output/output_11.mkv
408 101 101 101 101
/home/davidh/fog_x_fork/examples/dataloader/mkv_output/output_16.mkv
436 108 108 108 108


In [9]:
import os
import pickle
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
from fog_x import RTXLoader, BaseLoader, BaseExporter

class ParquetExporter(BaseExporter):
    def __init__(self):
        super(ParquetExporter, self).__init__()

    def _serialize(self, data):
        return pickle.dumps(np.array(data))

    def _step_data_to_df(self, step_data):
        step = {}
        for dataset_key, element in step_data.items():
            if dataset_key == "observation":
                for obs_key, obs_element in element.items():
                    step[obs_key] = self._serialize(obs_element)
            elif dataset_key == "action":
                for action_key, action_element in element.items():
                    step[action_key] = self._serialize(action_element)
            else:
                step[dataset_key] = self._serialize(element)
        return step

    def export(self, loader: BaseLoader, output_path: str):
        for steps_data in loader:
            step_data_as_df = [self._step_data_to_df(step_data) for step_data in steps_data]
            combined_df = pd.DataFrame(step_data_as_df)
            table = pa.Table.from_pandas(combined_df)
            pq.write_table(table, output_path)

class ParquetLoader(BaseLoader):
    def __init__(self, data_path):
        super(ParquetLoader, self).__init__(data_path)
        self.files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith('.parquet')]
        self.index = 0

    def __len__(self):
        return len(self.files)
    
    def __iter__(self):
        return self

    def __next__(self):
        if self.index < len(self.files):
            result = self._load_file(self.files[self.index])
            self.index += 1
            return result
        else:
            raise StopIteration

    def _load_file(self, filename):
        table = pq.read_table(filename)
        df = table.to_pandas()
        steps = [dict(row) for _, row in df.iterrows()]
        return steps

# Example usage
rtx_loader = RTXLoader(os.path.expanduser("~/datasets/berkeley_autolab_ur5/0.1.0"), split='train[:51]')
exporter = ParquetExporter()
exporter.export(rtx_loader, "output.parquet")

parquet_loader = ParquetLoader("/home/davidh/fog_x_fork/benchmarking/")
print(parquet_loader._load_file("/home/davidh/fog_x_fork/benchmarking/output.parquet"))


ImportError: cannot import name 'RTXLoader' from 'fog_x' (/home/davidh/miniconda3/envs/fog_x_env/lib/python3.8/site-packages/fog_x/__init__.py)