Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Various changes resulting from Steve Singer's review:

1.  INITIAL_STACK_SIZE from #define to static int

2.  Get rid of #defines of begin/commit, in favor of slon_dstrings

3.  Add an initial query, at monitor thread start time, which
    purges old data out of sl_components.

    We removed the other way of cleaning it out, but surely still
    need a way.

4.  Some memory allocation fixes

5.  stack_dump() now locks the stack_lock mutex, entry_dump() comments
    warn that users of it need to take that lock.

6.  Annotate parms of monitor_state() with "const" as needful
  • Loading branch information...
commit d00574d2d1793a372467bc103a7772030fd939ab 1 parent 03f52da
authored March 15, 2011
61  src/slon/monitor_thread.c
@@ -26,6 +26,7 @@ static void stack_init(void);
26 26
 static bool stack_pop(SlonState * current);
27 27
 static void stack_dump();
28 28
 static void entry_dump(int i, SlonState * tos);
  29
+static int initial_stack_size=6;
29 30
 
30 31
 /* ----------
31 32
  * Global variables
@@ -48,7 +49,7 @@ void *
48 49
 monitorThread_main(void *dummy)
49 50
 {
50 51
 	SlonConn   *conn;
51  
-	SlonDString beginquery;
  52
+	SlonDString beginquery, commitquery;
52 53
 	SlonDString monquery;
53 54
 
54 55
 	PGconn	   *dbconn;
@@ -74,6 +75,26 @@ monitorThread_main(void *dummy)
74 75
 		dbconn = conn->dbconn;
75 76
 		slon_log(SLON_DEBUG2, "monitorThread: setup DB conn\n");
76 77
 
  78
+		/* Start by emptying the sl_components table */
  79
+		dstring_init(&monquery);
  80
+		slon_mkquery(&monquery,
  81
+					 "delete from %s.sl_components;",
  82
+					 rtcfg_namespace);
  83
+
  84
+		res = PQexec(dbconn, dstring_data(&monquery));
  85
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
  86
+		{
  87
+			slon_log(SLON_FATAL,
  88
+					 "monitorThread: \"%s\" - %s",
  89
+					 dstring_data(&monquery), PQresultErrorMessage(res));
  90
+			PQclear(res);
  91
+			dstring_free(&monquery);
  92
+			slon_retry();
  93
+		} else {
  94
+			PQclear(res);
  95
+			dstring_free(&monquery);
  96
+		}
  97
+		
77 98
 		monitor_state("local_monitor", 0, (pid_t) conn->conn_pid, "thread main loop", 0, "n/a");
78 99
 
79 100
 		/*
@@ -83,6 +104,9 @@ monitorThread_main(void *dummy)
83 104
 		slon_mkquery(&beginquery,
84 105
 					 "start transaction;");
85 106
 
  107
+		dstring_init(&commitquery);
  108
+		slon_mkquery(&commitquery, "commit;");
  109
+
86 110
 		while ((rc = (ScheduleStatus) sched_wait_time(conn, SCHED_WAIT_SOCK_READ, monitor_interval) == SCHED_STATUS_OK))
87 111
 		{
88 112
 			int			qlen;
@@ -92,13 +116,12 @@ monitorThread_main(void *dummy)
92 116
 			pthread_mutex_unlock(&stack_lock);
93 117
 			if (qlen >= 0)
94 118
 			{
95  
-#define BEGINQUERY "start transaction;"
96  
-				res = PQexec(dbconn, BEGINQUERY);
  119
+				res = PQexec(dbconn, dstring_data(&beginquery));
97 120
 				if (PQresultStatus(res) != PGRES_COMMAND_OK)
98 121
 				{
99 122
 					slon_log(SLON_FATAL,
100 123
 							 "monitorThread: \"%s\" - %s",
101  
-							 BEGINQUERY, PQresultErrorMessage(res));
  124
+							 dstring_data(&beginquery), PQresultErrorMessage(res));
102 125
 					PQclear(res);
103 126
 					slon_retry();
104 127
 					break;
@@ -168,16 +191,18 @@ monitorThread_main(void *dummy)
168 191
 					PQclear(res);
169 192
 				}
170 193
 
171  
-#define COMMITQUERY "commit;"
172  
-				res = PQexec(dbconn, COMMITQUERY);
  194
+				res = PQexec(dbconn, dstring_data(&commitquery));
173 195
 				if (PQresultStatus(res) != PGRES_COMMAND_OK)
174 196
 				{
175 197
 					slon_log(SLON_FATAL,
176 198
 							 "monitorThread: %s - %s\n",
177  
-							 COMMITQUERY,
  199
+							 dstring_data(&commitquery),
178 200
 							 PQresultErrorMessage(res));
179 201
 					PQclear(res);
  202
+					dstring_free(&monquery);
180 203
 					slon_retry();
  204
+				} else {
  205
+					dstring_free(&monquery);
181 206
 				}
182 207
 
183 208
 			}
@@ -191,7 +216,7 @@ monitorThread_main(void *dummy)
191 216
 	slon_log(SLON_CONFIG, "monitorThread: exit main loop\n");
192 217
 
193 218
 	dstring_free(&beginquery);
194  
-	dstring_free(&monquery);
  219
+	dstring_free(&commitquery);
195 220
 	slon_disconnectdb(conn);
196 221
 
197 222
 	slon_log(SLON_INFO, "monitorThread: thread done\n");
@@ -199,11 +224,10 @@ monitorThread_main(void *dummy)
199 224
 	return (void *) 0;
200 225
 }
201 226
 
202  
-#define INITIAL_STACK_SIZE 6
203 227
 static void
204 228
 stack_init(void)
205 229
 {
206  
-	stack_maxlength = INITIAL_STACK_SIZE;
  230
+	stack_maxlength = initial_stack_size;
207 231
 	mstack = malloc(sizeof(SlonState) * (stack_maxlength + 1));
208 232
 	if (mstack == NULL)
209 233
 	{
@@ -218,7 +242,7 @@ stack_init(void)
218 242
 }
219 243
 
220 244
 void
221  
-monitor_state(char *actor, int node, pid_t conn_pid, /* @null@ */ char *activity, int64 event, /* @null@ */ char *event_type)
  245
+monitor_state(const char *actor, int node, pid_t conn_pid, /* @null@ */ const char *activity, int64 event, /* @null@ */ const char *event_type)
222 246
 {
223 247
 	size_t		len;
224 248
 	SlonState  *tos;
@@ -300,7 +324,7 @@ monitor_state(char *actor, int node, pid_t conn_pid, /* @null@ */ char *activity
300 324
 		if (ns)
301 325
 		{
302 326
 			strncpy(ns, actor, len);
303  
-			ns[len] = (char) 0;
  327
+			ns[len] = '\0';
304 328
 			tos->actor = ns;
305 329
 		}
306 330
 		else
@@ -390,14 +414,6 @@ stack_pop( /* @out@ */ SlonState * qentry)
390 414
 	}
391 415
 }
392 416
 
393  
-/*
394  
- * Local Variables:
395  
- *	tab-width: 4
396  
- *	c-indent-level: 4
397  
- *	c-basic-offset: 4
398  
- * End:
399  
- */
400  
-
401 417
 static void
402 418
 stack_dump()
403 419
 {
@@ -405,14 +421,19 @@ stack_dump()
405 421
 	SlonState  *tos;
406 422
 
407 423
 	slon_log(SLON_DEBUG2, "monitorThread: stack_dump()\n");
  424
+	pthread_mutex_lock(&stack_lock);
408 425
 	for (i = 0; i < stack_size; i++)
409 426
 	{
410 427
 		tos = mstack + i;
411 428
 		entry_dump(i, tos);
412 429
 	}
413 430
 	slon_log(SLON_DEBUG2, "monitorThread: stack_dump done\n");
  431
+	pthread_mutex_unlock(&stack_lock);
414 432
 }
415 433
 
  434
+/* Note that this function accesses stack contents, and thus needs to
  435
+ * be guarded by the pthread mutex on stack_lock */
  436
+
416 437
 static void
417 438
 entry_dump(int i, SlonState * tos)
418 439
 {
2  src/slon/slon.h
@@ -520,7 +520,7 @@ extern void *syncThread_main(void *dummy);
520 520
  * ----------
521 521
  */
522 522
 extern void *monitorThread_main(void *dummy);
523  
-extern void monitor_state (char *actor, int node, pid_t conn_pid, char *activity, int64 event, char *event_type);
  523
+extern void monitor_state (const char *actor, int node, pid_t conn_pid, const char *activity, int64 event, const char *event_type);
524 524
 
525 525
 /* ----------
526 526
  * Globals in monitor_thread.c

0 notes on commit d00574d

Please sign in to comment.
Something went wrong with that request. Please try again.