diff --git a/Siphon/Monitors/DataMonitor.vb b/Siphon/Monitors/DataMonitor.vb index d358402..f29c6a9 100644 --- a/Siphon/Monitors/DataMonitor.vb +++ b/Siphon/Monitors/DataMonitor.vb @@ -1,4 +1,5 @@ -Imports System.Timers +Imports System.Collections.ObjectModel +Imports System.Timers Imports log4net ''' @@ -11,6 +12,7 @@ Public MustInherit Class DataMonitor Private Shared ReadOnly Log As ILog = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod.DeclaringType) Private _disposed As Boolean = False Private _name As String = String.Empty + Private _processing As Boolean = False Private _processor As IDataProcessor = Nothing Private _schedule As IMonitorSchedule = Nothing Private WithEvents _timer As System.Timers.Timer @@ -18,10 +20,12 @@ Public MustInherit Class DataMonitor ''' ''' Creates a new DataMonitor instance. ''' + ''' String. The friendly name for the monitor. ''' IDataProcessor. The processor to use to handle newly found data. ''' IMonitorSchedule. The schedule used to detemrin when to look for new data. ''' - Protected Sub New(ByVal schedule As IMonitorSchedule, ByVal processor As IDataProcessor) + Protected Sub New(ByVal name As String, ByVal schedule As IMonitorSchedule, ByVal processor As IDataProcessor) + Me.Name = name.Trim Me.Schedule = schedule Me.Processor = processor End Sub @@ -32,7 +36,7 @@ Public MustInherit Class DataMonitor ''' ''' String ''' - Public Property Name() As String Implements IDataMonitor.Name + Public Overridable Property Name() As String Implements IDataMonitor.Name Get Return _name End Get @@ -41,6 +45,18 @@ Public MustInherit Class DataMonitor End Set End Property + ''' + ''' Gets flag indicating if the current monitor is processing new data. + ''' + ''' + ''' Boolean. True if the monitor is processing new data. False otherwise. + ''' + Public Overridable ReadOnly Property Processing() As Boolean + Get + Return _processing + End Get + End Property + ''' ''' Gets/sets the processor to handle newly found data. ''' @@ -60,7 +76,7 @@ Public MustInherit Class DataMonitor ''' Scans for new data and sends new data to the current processor. ''' ''' - Public MustOverride Sub Scan() Implements IDataMonitor.Scan + Public MustOverride Function Scan() As Collection(Of Object) Implements IDataMonitor.Scan ''' ''' Starts monitoring for new data. @@ -69,13 +85,28 @@ Public MustInherit Class DataMonitor Public Overridable Sub Start() Implements IDataMonitor.Start Log.InfoFormat("Starting Monitor {0}", Me.Name) - Dim start As DateTime = DateTime.Now - Dim nextEvent As DateTime = Me.Schedule.NextEvent(start) + Timer.Interval = Me.NextInterval + Timer.Start() + End Sub - Log.DebugFormat("Monitor Start Time {0}", start) - Log.DebugFormat("Next Monitor Scan {0}", nextEvent) + ''' + ''' Pauses data monitoring, usually while processing files. + ''' + ''' + Public Overridable Sub Pause() Implements IDataMonitor.Pause + Log.InfoFormat("Pausing Monitor {0}", Me.Name) + + Timer.Stop() + End Sub - Timer.Interval = (nextEvent - start).TotalMilliseconds + ''' + ''' Resumes data monitors, usually after processing new data. + ''' + ''' + Public Overridable Sub [Resume]() Implements IDataMonitor.Resume + Log.InfoFormat("Resuming Monitor {0}", Me.Name) + + Timer.Interval = Me.NextInterval Timer.Start() End Sub @@ -84,9 +115,15 @@ Public MustInherit Class DataMonitor ''' ''' Public Overridable Sub [Stop]() Implements IDataMonitor.Stop - Log.InfoFormat("Stopping Monitor {0}", Me.Name) + If Timer.Enabled Then + Log.InfoFormat("Stopping Monitor {0}", Me.Name) - Timer.Stop() + If Me.Processing Then + Log.Debug("Waiting for processor to finish") + End If + + Timer.Stop() + End If End Sub ''' @@ -145,9 +182,42 @@ Public MustInherit Class DataMonitor GC.SuppressFinalize(Me) End Sub + ''' + ''' Gets the next event from the schedule and sets the timers interval. + ''' + ''' + ''' + Private Function NextInterval() As Integer + Dim start As DateTime = DateTime.Now + Dim nextEvent As DateTime = Me.Schedule.NextEvent(start) + Dim interval As Integer = (nextEvent - start).TotalMilliseconds + + Log.DebugFormat("Monitor Start Time {0}", start) + Log.DebugFormat("Next Monitor Scan {0}", nextEvent) + Log.DebugFormat("Timer Interval {0}", interval) + + Return interval + End Function + Private Sub _timer_Elapsed(ByVal sender As Object, ByVal e As System.Timers.ElapsedEventArgs) Handles _timer.Elapsed - Me.Stop() - Me.Scan() - Me.Start() + Me.Pause() + + Try + Dim items As Collection(Of Object) = Me.Scan + If items.Count > 0 Then + _processing = True + Log.DebugFormat("{0} {1}", Me.Name, _processing) + For Each item As Object In items + Me.Processor.Process(item) + Next + Log.Debug("POST Process") + + _processing = False + End If + Catch ex As Exception + 'Log.Debug("BOOM", ex) + End Try + + Me.Resume() End Sub End Class diff --git a/Siphon/Monitors/DirectoryMonitor.vb b/Siphon/Monitors/DirectoryMonitor.vb index 07c5995..1252b98 100644 --- a/Siphon/Monitors/DirectoryMonitor.vb +++ b/Siphon/Monitors/DirectoryMonitor.vb @@ -1,46 +1,52 @@ -Imports System.IO -Imports log4net +Imports log4net ''' ''' Monitors a directory for new files. ''' ''' -Public Class DirectoryMonitor +Public MustInherit Class DirectoryMonitor Inherits DataMonitor Implements IDirectoryMonitor Private Shared ReadOnly Log As ILog = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod.DeclaringType) Private Const DEFAULT_FILTER As String = "*" + Private _createMissingFolders As Boolean = False Private _filter As String = DEFAULT_FILTER Private _path As String = String.Empty ''' ''' Creates a new directory monitor. ''' + ''' String. The friendly name for the monitor. ''' String. The full path to the directory to be monitored. ''' IMonitorSchedule. The schedule used to monitor the directory. ''' IDataProcessor. The data processor to use to process new files. ''' - Public Sub New(ByVal path As String, ByVal schedule As IMonitorSchedule, ByVal processor As IDataProcessor) - MyBase.New(schedule, processor) + Public Sub New(ByVal name As String, ByVal path As String, ByVal schedule As IMonitorSchedule, ByVal processor As IDataProcessor) + MyBase.New(name, schedule, processor) Me.Path = path End Sub ''' - ''' Scans the specified directory for new files mathcing the specified filter. + ''' Creates missing folders before starting the timer. ''' ''' - Public Overrides Sub Scan() - Log.DebugFormat("Scanning {0} for {1}", Me.Path, Me.Filter) + Public MustOverride Sub CreateFolders() - Dim files() As String = Directory.GetFiles(Me.Path, Me.Filter, SearchOption.TopDirectoryOnly) - Log.DebugFormat("Found {0} files in {1}", files.Length, Me.Path) - - For Each file As String In files - Log.DebugFormat("Processing {0}", file) - Me.Processor.Process(file) - Next - End Sub + ''' + ''' Gets/sets flag determining whether to create any missing folders when starting the monitor. + ''' + ''' + ''' Boolean + ''' + Public Overridable Property CreateMissingFolders() As Boolean + Get + Return _createMissingFolders + End Get + Set(ByVal value As Boolean) + _createMissingFolders = value + End Set + End Property ''' ''' Gets/sets the file filter to apply to the directory. @@ -75,4 +81,12 @@ Public Class DirectoryMonitor _path = value.Trim End Set End Property + + Public Overrides Sub Start() + If Me.CreateMissingFolders Then + Me.CreateFolders() + End If + + MyBase.Start() + End Sub End Class diff --git a/Siphon/Monitors/FtpDirectoryMonitor.vb b/Siphon/Monitors/FtpDirectoryMonitor.vb new file mode 100644 index 0000000..5d57042 --- /dev/null +++ b/Siphon/Monitors/FtpDirectoryMonitor.vb @@ -0,0 +1,42 @@ +Imports System.Collections.ObjectModel +Imports System.IO +Imports log4net + +Public Class FtpDirectoryMonitor + Inherits DirectoryMonitor + + Private Shared ReadOnly Log As ILog = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod.DeclaringType) + + ''' + ''' Creates a new directory monitor. + ''' + ''' String. The friendly name for the monitor. + ''' String. The full path to the directory to be monitored. + ''' IMonitorSchedule. The schedule used to monitor the directory. + ''' IDataProcessor. The data processor to use to process new files. + ''' + Public Sub New(ByVal name As String, ByVal path As String, ByVal schedule As IMonitorSchedule, ByVal processor As IDataProcessor) + MyBase.New(name, path, schedule, processor) + End Sub + + ''' + ''' Create any missing folders during start. + ''' + ''' + Public Overrides Sub CreateFolders() + + End Sub + + ''' + ''' Scans the specified directory for new files mathcing the specified filter. + ''' + ''' + Public Overrides Function Scan() As Collection(Of Object) + Log.DebugFormat("Scanning {0} for {1}", Me.Path, Me.Filter) + + Dim files() As String = Directory.GetFiles(Me.Path, Me.Filter, SearchOption.TopDirectoryOnly) + Log.DebugFormat("Found {0} files in {1}", files.Length, Me.Path) + + Return New System.Collections.ObjectModel.Collection(Of Object)(files) + End Function +End Class diff --git a/Siphon/Monitors/IDataMonitor.vb b/Siphon/Monitors/IDataMonitor.vb index a2dcba7..ef82db6 100644 --- a/Siphon/Monitors/IDataMonitor.vb +++ b/Siphon/Monitors/IDataMonitor.vb @@ -1,4 +1,6 @@ -''' +Imports System.Collections.ObjectModel + +''' ''' Interface that defines a data monitoring instance. ''' ''' @@ -25,6 +27,18 @@ Public Interface IDataMonitor ''' Sub [Stop]() + ''' + ''' Pauses data monitoring, usually while processing files. + ''' + ''' + Sub Pause() + + ''' + ''' Resumes data monitors, usually after processing new data. + ''' + ''' + Sub [Resume]() + ''' ''' Gets/sets the data processor to use when new data is found. ''' @@ -45,5 +59,5 @@ Public Interface IDataMonitor ''' Scans for new data and sends new data to the current processor. ''' ''' - Sub Scan() + Function Scan() As Collection(Of Object) End Interface diff --git a/Siphon/Monitors/LocalDirectoryMonitor.vb b/Siphon/Monitors/LocalDirectoryMonitor.vb new file mode 100644 index 0000000..02461ac --- /dev/null +++ b/Siphon/Monitors/LocalDirectoryMonitor.vb @@ -0,0 +1,48 @@ +Imports System.Collections.ObjectModel +Imports System.IO +Imports log4net + +Public Class LocalDirectoryMonitor + Inherits DirectoryMonitor + + Private Shared ReadOnly Log As ILog = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod.DeclaringType) + + ''' + ''' Creates a new directory monitor. + ''' + ''' String. The friendly name for the monitor. + ''' String. The full path to the directory to be monitored. + ''' IMonitorSchedule. The schedule used to monitor the directory. + ''' IDataProcessor. The data processor to use to process new files. + ''' + Public Sub New(ByVal name As String, ByVal path As String, ByVal schedule As IMonitorSchedule, ByVal processor As IDataProcessor) + MyBase.New(name, path, schedule, processor) + End Sub + + ''' + ''' Create any missing folders during start. + ''' + ''' + Public Overrides Sub CreateFolders() + If Me.CreateMissingFolders Then + If Not Directory.Exists(Me.Path) Then + Log.DebugFormat("Creating directory {0}", Me.Path) + + Directory.CreateDirectory(Me.Path) + End If + End If + End Sub + + ''' + ''' Scans the specified directory for new files mathcing the specified filter. + ''' + ''' + Public Overrides Function Scan() As Collection(Of Object) + Log.DebugFormat("Scanning {0} for {1}", Me.Path, Me.Filter) + + Dim files() As String = Directory.GetFiles(Me.Path, Me.Filter, SearchOption.TopDirectoryOnly) + Log.DebugFormat("Found {0} files in {1}", files.Length, Me.Path) + + Return New System.Collections.ObjectModel.Collection(Of Object)(files) + End Function +End Class diff --git a/Siphon/Schedules/DailySchedule.vb b/Siphon/Schedules/DailySchedule.vb index 22bebc6..3c67ea0 100644 --- a/Siphon/Schedules/DailySchedule.vb +++ b/Siphon/Schedules/DailySchedule.vb @@ -51,7 +51,10 @@ Public Class DailySchedule If tp > max Then Log.WarnFormat("Skipping TimeSpan greater than 24 hours {0}", tp) Else - If tp >= start.TimeOfDay Then + Log.DebugFormat("Time Span {0}", tp.ToString) + Log.DebugFormat("Start TimeOfDay {0}", start.TimeOfDay) + + If tp >= start.TimeOfDay And (tp - start.TimeOfDay).TotalSeconds >= 1 Then Return New DateTime(start.Year, start.Month, start.Day, tp.Hours, tp.Minutes, tp.Seconds) End If End If diff --git a/Siphon/Siphon.vbproj b/Siphon/Siphon.vbproj index 8cb8d58..95037ab 100644 --- a/Siphon/Siphon.vbproj +++ b/Siphon/Siphon.vbproj @@ -55,7 +55,9 @@ + + diff --git a/SiphonTests/MonitorTests.vb b/SiphonTests/MonitorTests.vb index 187015d..d0c08e9 100644 --- a/SiphonTests/MonitorTests.vb +++ b/SiphonTests/MonitorTests.vb @@ -10,24 +10,133 @@ Public Class MonitorTests log4net.Config.XmlConfigurator.Configure() End Sub -#Region "Directory Monitor Tests" +#Region "Local Directory Monitor Tests" - _ + _ Public Sub DirectoryMonitor() - Dim temp As DirectoryInfo = Directory.CreateDirectory(Path.Combine(Path.GetTempPath, Path.GetRandomFileName)) - Dim f = File.Create(Path.Combine(temp.FullName, Path.GetRandomFileName)) - f.Dispose() - - Using monitor As IDataMonitor = New DirectoryMonitor(temp.FullName, New IntervalSchedule(5), New MockProcessor) - monitor.Name = "TestDirectoryMonitor" - monitor.Start() - Threading.Thread.Sleep(20000) - monitor.Stop() + Dim tempdir As DirectoryInfo = Directory.CreateDirectory(Path.Combine(Path.GetTempPath, Path.GetRandomFileName)) + File.Create(Path.Combine(tempdir.FullName, "SUCCESS")) + + Using schedule = New DailySchedule(DateTime.Now.AddSeconds(2).TimeOfDay) + Using processor = New MockProcessor + Using monitor As IDataMonitor = New LocalDirectoryMonitor("LocalMonitor", tempdir.FullName, schedule, processor) + monitor.Start() + Threading.Thread.Sleep(3000) + monitor.Stop() + + Assert.AreEqual(1, processor.Count, "Has processed 1 file") + End Using + End Using + End Using + + tempdir.Delete(True) + End Sub + + _ + Public Sub DirectoryMonitorProcessorFailure() + Dim tempdir As DirectoryInfo = Directory.CreateDirectory(Path.Combine(Path.GetTempPath, Path.GetRandomFileName)) + File.Create(Path.Combine(tempdir.FullName, "FAILURE")) + + Using schedule = New DailySchedule(DateTime.Now.AddSeconds(2).TimeOfDay) + Using processor = New MockProcessor + Using monitor As IDataMonitor = New LocalDirectoryMonitor("LocalMonitor", tempdir.FullName, schedule, processor) + monitor.Start() + Threading.Thread.Sleep(3000) + monitor.Stop() + + Assert.AreEqual(1, processor.Count, "Has processed 1 file") + End Using + End Using + End Using + + tempdir.Delete(True) + End Sub + + _ + Public Sub DirectoryMonitorProcessorException() + Dim tempdir As DirectoryInfo = Directory.CreateDirectory(Path.Combine(Path.GetTempPath, Path.GetRandomFileName)) + File.Create(Path.Combine(tempdir.FullName, "EXCEPTION")) + + Using schedule = New DailySchedule(DateTime.Now.AddSeconds(2).TimeOfDay) + Using processor = New MockProcessor + Using monitor As IDataMonitor = New LocalDirectoryMonitor("LocalMonitor", tempdir.FullName, schedule, processor) + monitor.Start() + Threading.Thread.Sleep(3000) + monitor.Stop() + + Assert.AreEqual(1, processor.Count, "Has processed 1 file") + End Using + End Using + End Using + + tempdir.Delete(True) + End Sub + + _ + Public Sub DirectoryMonitorCreateDirectory() + Dim tempdir As String = Path.Combine(Path.GetTempPath, Path.GetRandomFileName) + + Using schedule = New DailySchedule(DateTime.Now.AddSeconds(1).TimeOfDay) + Using processor = New MockProcessor + Using monitor As IDataMonitor = New LocalDirectoryMonitor("LocalMonitor", tempdir, schedule, processor) + Assert.IsFalse(Directory.Exists(tempdir), "Monitor path doesn't exist") + + DirectCast(monitor, LocalDirectoryMonitor).CreateMissingFolders = True + monitor.Start() + monitor.Stop() + + Assert.IsTrue(Directory.Exists(tempdir), "Monitor path exista") + End Using + End Using End Using - temp.Delete(True) + Directory.Delete(tempdir, True) End Sub + _ + Public Sub DirectoryMonitorWithFilter() + Dim tempdir As DirectoryInfo = Directory.CreateDirectory(Path.Combine(Path.GetTempPath, Path.GetRandomFileName)) + File.Create(Path.Combine(tempdir.FullName, "SUCCESS")) + File.Create(Path.Combine(tempdir.FullName, "test.csv")) + + Using schedule = New DailySchedule(DateTime.Now.AddSeconds(2).TimeOfDay) + Using processor = New MockProcessor + Using monitor As IDataMonitor = New LocalDirectoryMonitor("LocalMonitor", tempdir.FullName, schedule, processor) + DirectCast(monitor, LocalDirectoryMonitor).Filter = "*.csv" + + monitor.Start() + Threading.Thread.Sleep(3000) + monitor.Stop() + + Assert.AreEqual(1, processor.Count, "Has processed 1 files") + End Using + End Using + End Using + + tempdir.Delete(True) + End Sub + + + _ + Public Sub DirectoryMonitorStillProcessing() + Dim tempdir As DirectoryInfo = Directory.CreateDirectory(Path.Combine(Path.GetTempPath, Path.GetRandomFileName)) + File.Create(Path.Combine(tempdir.FullName, "SUCCESS")) + + Using schedule = New DailySchedule(DateTime.Now.AddSeconds(2).TimeOfDay) + Using processor = New MockProcessor + Using monitor As IDataMonitor = New LocalDirectoryMonitor("LocalMonitor", tempdir.FullName, schedule, processor) + processor.DelayProcess = 10 + monitor.Start() + Threading.Thread.Sleep(3000) + monitor.Stop() + + Assert.AreEqual(1, processor.Count, "Has processed 1 files") + End Using + End Using + End Using + + tempdir.Delete(True) + End Sub #End Region End Class diff --git a/SiphonTests/Processors/MockProcessor.vb b/SiphonTests/Processors/MockProcessor.vb index 9aa6f22..d978e50 100644 --- a/SiphonTests/Processors/MockProcessor.vb +++ b/SiphonTests/Processors/MockProcessor.vb @@ -1,4 +1,5 @@ -Imports log4net +Imports System.IO +Imports log4net Imports ChrisLaco.Siphon Public Class MockProcessor @@ -6,7 +7,45 @@ Public Class MockProcessor Private Shared ReadOnly Log As ILog = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod.DeclaringType) + Private _count As Integer = 0 + Private _delayProcess As Integer = 0 + Public Overrides Function Process(ByVal data As Object) As Boolean - Log.Debug("MockProcessor.Process") + Log.DebugFormat("MockProcessor.Process {0}", data.ToString) + _count += 1 + + Dim info As FileInfo = New FileInfo(data) + + Log.DebugFormat("Delay process {0}", Me.DelayProcess) + + Log.DebugFormat("Delay process {0}", Me.DelayProcess) + + Select Case info.Name.ToUpper + Case "SUCCESS" + Return True + Case "FAILURE" + Return False + Case "EXCEPTION" + Throw New Exception + End Select End Function + + Public ReadOnly Property Count() As Integer + Get + Return _count + End Get + End Property + + Public Property DelayProcess() As Integer + Get + Return _delayProcess + End Get + Set(ByVal value As Integer) + _delayProcess = value + End Set + End Property + + Public Sub Reset() + _count = 0 + End Sub End Class diff --git a/SiphonTests/ScheduleTests.vb b/SiphonTests/ScheduleTests.vb index b8f5bbb..ae5f52f 100644 --- a/SiphonTests/ScheduleTests.vb +++ b/SiphonTests/ScheduleTests.vb @@ -54,7 +54,7 @@ Public Class ScheduleTests start = DateTime.Parse("1/1/2001 4:00 AM") ndt = schedule.NextEvent(start) - Assert.AreEqual(4, ndt.Hour, "Got hour 4") + Assert.AreEqual(12, ndt.Hour, "Got hour 12") Assert.AreEqual(start.Date, ndt.Date, "Got same date") start = DateTime.Parse("1/1/2001 4:10 AM") diff --git a/SiphonTests/app.config b/SiphonTests/app.config index 07ed724..640f3a9 100644 --- a/SiphonTests/app.config +++ b/SiphonTests/app.config @@ -6,7 +6,7 @@ - +