# DictFS

Turn a dictionary into a file system.

In [5]:
from fs.base import FS
from fs.subfs import SubFS
from fs.info import Info
from os.path import basename, dirname
from fs.errors import ResourceNotFound, ResourceReadOnly
from fs.errors import RemoveRootError
from fs.errors import DirectoryExists, DirectoryNotEmpty
from fs.errors import FileExpected, DirectoryExpected
from io import BytesIO
from Dcel import Dcel

DirectoryTypes = (dict,list)
ByteableTypes = (bytes,str)
WriteModes = ('w','a')

class DictFS(FS):
    def __init__(self,fsdict,mode='a'):
        self.fsdict = fsdict
        self._mode = mode
        super().__init__()
    
    @property
    def _addr(self):
        return self.fsdict
    
    def _pathwalk(self,
                  path,
                  target=None
                 ):
        if target is None:
            target = self.fsdict
            if (not type(target) is dict):
                # UPDATE 8/7/2022 - raygan - Return fsdict without lookup.
                # This enables Fudge glob where fu/'*' needs to return a list.
                return target
                # raise TypeError(f"DictFS internal 'fsdict' type must 'dict', not {type(target)}.")
            
        if path in (None,"",".","/"):
            return target
        
        path = path.strip("/")

        try:
            seg,nextpath = path.split('/',1)
        except ValueError:
            nextpath = None
            seg = path

        # lookup seg within target.
        # target may be a Dict or Dcel
        if not seg in target:
            raise ResourceNotFound(path)
            return None

        nexttarget = target[seg]

        if type(nexttarget) is dict\
        and not nextpath is None:
            return self._pathwalk(nextpath,
                                  nexttarget)
        return nexttarget
    
    
    ####################
    # pyfilesystem Api #
    ####################
    
    def getinfo(self, path, namespaces=None):
        target = self._pathwalk(path)
        
        if type(target) in DirectoryTypes:
            is_dir = True
        else:
            is_dir = False
            
        i = Info({"basic":{
            "name": basename(path.strip('/')),
            "is_dir": is_dir
        }})
        
        if namespaces != None \
        and "dcel" in namespaces:
            i.raw["dcel"] = {"value":
                             target
                            }
        return i
    
    def listdir(self, path):
        target = self._pathwalk(path)
        if type(target) in DirectoryTypes:
            return list(target)
        return list()
    
    def makedir(self, path, permissions=None, recreate=False):
        
        if not self._mode in WriteModes:
            # raise ReadOnlyFilesystem
            return
        
        parpath = dirname(path)
        entryname = basename(path)
        target = self._pathwalk(parpath)
        if target is None:
            raise ResourceNotFound
            
        if entryname in target\
        and recreate is False:
            raise DirectoryExists(path)
        
        target[entryname] = dict()
        
        return SubFS(self, path)
    
    def openbin(self, path, mode='r', buffering=-1, **options):
        
        if mode in WriteModes\
        and not self._mode in WriteModes:
            raise ResourceReadOnly(f"DictFS: {path}")
        parpath = dirname(path)
        entryname = basename(path)
        parent = self._pathwalk(parpath)
        target = parent[entryname]
        targettype = type(target)
        
        if targettype in DirectoryTypes:
            raise FileExpected(path)
        
        if targettype is str:
            buf = target.encode(encoding='utf-8')
            # kludge to get writebytes to work
            # but it messes up getting strings from DictFS
            # parent[entryname] = buf
        elif targettype is bytes:
            buf = target
            
        else:
            raise FileExpected(path)
        
        return BytesIO(buf)
    
    def remove(self, path):
        if not self._mode in WriteModes:
            # raise ReadOnlyFilesystem
            return
        
        parpath = dirname(path)
        entryname = basename(path)
        pardir = self._pathwalk(parpath)
        if entryname in pardir:
            if not type(pardir[entryname]) in DirectoryTypes:
                pardir.pop(entryname)
            else:
                raise FileExpected(path)
        else:
            raise ResourceNotFound(path)
        
    def removedir(self, path):
        if not self._mode in WriteModes:
            # raise ReadOnlyFilesystem
            return
        
        if path == "/":
            raise RemoveRootError
            
        parpath = dirname(path)
        entryname = basename(path)
        pardir = self._pathwalk(parpath)
        if entryname in pardir:
            if type(pardir[entryname]) in DirectoryTypes:
                if pardir[entryname]:
                    raise DirectoryNotEmpty(path)
                else:
                    pardir.pop(entryname)
            else:
                raise DirectoryExpected(path)
        else:
            raise ResourceNotFound(path)
        
        
    def setinfo(self, path, info):
        if not self._mode in WriteModes:
            # raise ReadOnlyFilesystem
            return
        try:
            parpath = dirname(path)
            entryname = basename(path)
            target = self._pathwalk(parpath)
        except:
            return
        try:
            value = info['dcel']['value']
            target[entryname] = value
        except:
            return
        
    ### pyfilesystem:
    #   override methods
    
    def writetext(self,
                  path,
                  contents,
                  encoding='utf-8',
                  errors=None,
                  newline=''):
        if not self._mode in WriteModes:
            raise ResourceReadOnly
        try:
            # walk to target
            parpath = dirname(path)
            entryname = basename(path)
            target = self._pathwalk(parpath)
        except:
            print("DictFS.writetext() error parpath, entryname, target=self._pathwalk(parpath)")
            return
        try:
            # set target
            result = target[entryname]
            if type(result) is Dcel:
                result.value = contents
            else:
                target[entryname] = contents
        except:
            print("DictFS.writetext() error trying to set child")
            print(f"DictFS::writetext(): parpath: {parpath}, entryname: {entryname}, target: {repr(target)}")
            return
    
    ####################
    #  cosmos API      #
    ####################
    
    # Depricate external use of any API other than pyfilesystem.
    # OK to keep if this is internal.

    def lookup(self, path, base=None):
        return self._pathwalk(path, base)
        

In [22]:
# Test geturl()

s = DictFS({"a":{"apple":"True","apricot":"True","anchovy":"True"}})
print(s.geturl('/a/apple'))
print(s.getinfo('/a/apple').raw)
print(s.readtext('/a/apple'))

None
{'basic': {'name': 'apple', 'is_dir': False}}
True


In [2]:
# Test writethrough with recursive DictFS
from Dcel import Dcel
baselayer = Dcel("apple a b c carrots")
apple = baselayer[0:5]
abc = baselayer[6:11]
carrots = baselayer[12:]
print(f"{apple} {abc} {carrots}")

based = Dcel({"apple": apple,
              "abc": abc,
              "carrots": carrots
             },
             service_class=DictFS
            )
print(based.service.fsdict)

a = abc[0:1]
b = abc[2:3]
c = abc[4:5]
print(f"{a} {b} {c}")

abcd = Dcel({"a":a, "b":b, "c":c},
         service_class=DictFS)
print(abcd.service.fsdict)

abcd['b'] = 'balloon'
print(abcd)
print(baselayer)

Dcel::__init__() address=apple a b c carrots service=None args=None service_class=None
Dcel::__init__() address=slice(0, 5, None) service=apple a b c carrots args=None service_class=None
Dcel::__init__() address=slice(6, 11, None) service=apple a b c carrots args=None service_class=None
Dcel::__init__() address=slice(12, None, None) service=apple a b c carrots args=None service_class=None
apple a b c carrots
Dcel::__init__() address={'apple': <Dcel.Dcel object at 0x7fa4140fc1c0>, 'abc': <Dcel.Dcel object at 0x7fa41410ba60>, 'carrots': <Dcel.Dcel object at 0x7fa41410b070>} service=None args=None service_class=<class '__main__.DictFS'>
Dcel::__init__() service_class and address
Dcel::__init__() self.service=<__main__.DictFS object at 0x7fa4141903d0>
{'apple': <Dcel.Dcel object at 0x7fa4140fc1c0>, 'abc': <Dcel.Dcel object at 0x7fa41410ba60>, 'carrots': <Dcel.Dcel object at 0x7fa41410b070>}
Dcel::__init__() address=slice(0, 1, None) service=a b c args=None service_class=None
Dcel::__init__

In [18]:
abcd['b'].flush()
print(f"{abcd['b'].address} {abcd['b'].service}")

abcd['b'].value.flush()


flushing /b <__main__.DictFS object at 0x7f123433af70>
/b <__main__.DictFS object at 0x7f123433af70>
flushing slice(2, 3, None) a balloonal
flushing slice(6, 11, None) apple a balloonalloon c carrots


In [20]:
baselayer.value.flush()

AttributeError: 'str' object has no attribute 'flush'

In [2]:
# Test dict with Dcels in the values.
from Dcel import Dcel

a = Dcel("apple")
b = a[1:3]  # test with Dcel slice
c = Dcel("carrot")

stringbuf = Dcel("apple carrot")
a = stringbuf[0:5]
b = a[1:3]  # test with Dcel slice
c = stringbuf[6:12]
fsdict = { "a": a, "b": b, "c": c }

d = Dcel(fsdict,service_class=DictFS)
print(d["b"].address)
print(type(d["b"]))
d["b"] = "bb"  # this should access the slice

for ea in d.listdir():
   print(f"{ea}: {d[ea]} {type(d[ea])}")

print(stringbuf)


/b
<class 'Dcel.Dcel'>
a: abble <class 'Dcel.Dcel'>
b: bb <class 'Dcel.Dcel'>
c: carrot <class 'Dcel.Dcel'>
abble carrot


In [13]:
# test data
from fs.osfs import OSFS
fsdict = { "colors":
          { "red": "Rose",
            "green": "Grapes",
            "blue": "Berries"
           },
           "services": { "file": OSFS }
         }

a = DictFS(fsdict)

In [16]:
# lookup()
a.lookup("colors/blue")

'Berries'

In [17]:
# exists()
a.exists("colors/blue")

True

In [5]:
a.getinfo('/').raw
#a.readtext('/')
#a._addr

{'basic': {'name': '', 'is_dir': True}}

In [6]:
a.lookup("colors/red")

'Rose'

In [7]:
print(a.readtext("colors/red"))

Rose


In [8]:
a.writetext("colors/red","Fire Truck")

In [9]:
a.remove("colors/red")

In [10]:
a.listdir('///colors/')

['green', 'blue']

In [11]:
a.getinfo('/services/file',namespaces=['dcel']).raw

{'basic': {'name': 'file', 'is_dir': False}, 'dcel': {'value': fs.osfs.OSFS}}

In [9]:
a.makedir('/colors/grey')
a.makedir('/colors/brown')

SubFS(<__main__.DictFS object at 0x119b01670>, '/colors/brown')

In [10]:
a.removedir('colors/grey')

In [11]:
"//red//green/blue".strip('/').split('/',1)

['red', '/green/blue']

In [12]:
from os.path import basename, dirname

basename("red/green//")
dirname("/blue/yellow")


'/blue'

In [13]:
d = {1:2,3:4}
d.pop(3)
d

{1: 2}

In [14]:
l = []
if l:
    print(True)
else:
    print(False)

False
