In [1]:
from Pyro5.api import expose, Daemon, locate_ns
from os import listdir
import threading
import pandas as pd
from os import path
import netifaces

In [2]:
@expose
class DataWarehouse(object):
    def __init__(self, dirpath, outfile, dataCleanFunc):
        self.dirpath = dirpath
        self.contents = [f for f in listdir(dirpath) if f.endswith('.csv')]
        self.outfile = outfile + '.csv'
        self.dataCleanFunc = dataCleanFunc
        self.headerNotWritten = True

    def removeItem(self):
        def removeItemProper(lock):
            lock.acquire()
            item = None
            try:
                item = self.contents.pop()
            except(IndexError):
                item = None
            lock.release()
            return item
        
        return removeItemProper(threading.Lock())
    
    def cleanseData(self, item):
        def checkIfHeaderNotWritten():
            def checkIfHeaderNotWrittenProper(lock):
                nonlocal self
                
                lock.acquire()
                headerNotWritten = self.headerNotWritten
                self.headerNotWritten = False
                lock.release()
                return headerNotWritten
        
            return checkIfHeaderNotWrittenProper(threading.Lock())
        
        df = pd.read_csv(self.dirpath + item)
        result = self.dataCleanFunc(df)
        result.to_csv(self.outfile, mode = 'a', header = checkIfHeaderNotWritten(), index = False)

In [3]:
def openDataWarehouse(dirpath, outfile, dataCleanFunc):
    def getWiFiIPAdd():
        iface = netifaces.gateways()['default'][netifaces.AF_INET][1]
        ipadd = netifaces.ifaddresses(iface)[netifaces.AF_INET][0]['addr']
        return ipadd
    
    datawarehouse = DataWarehouse(dirpath, outfile, dataCleanFunc)
    HOST_IP = getWiFiIPAdd()
    HOST_PORT = 9092
    with Daemon(host=HOST_IP, port=HOST_PORT) as daemon:
        datawarehouse_uri = daemon.register(datawarehouse)
        with locate_ns() as ns:
            ns.register("example.datawarehouse", datawarehouse_uri)
        daemon.requestLoop()