Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Merge branch 'lukas/erts/efile_delayed_write_fix/OTP-10984' into maint

* lukas/erts/efile_delayed_write_fix/OTP-10984:
  Do driver_deq in worker threads instead of async_ready
  • Loading branch information...
commit e755c19236d19447efcf1809792e9c69ff970de0 2 parents 86626d3 + a73414d
Lukas Larsson authored March 27, 2013

Showing 1 changed file with 39 additions and 59 deletions. Show diff stats Hide diff stats

  1. 98  erts/emulator/drivers/common/efile_drv.c
98  erts/emulator/drivers/common/efile_drv.c
@@ -397,7 +397,6 @@ struct t_pwritev {
397 397
     ErlDrvPort         port;
398 398
     ErlDrvPDL          q_mtx;
399 399
     size_t             size;
400  
-    size_t             free_size;
401 400
     unsigned           cnt;
402 401
     unsigned           n;
403 402
     struct t_pbuf_spec specs[1];
@@ -462,7 +461,6 @@ struct t_data
462 461
 	    ErlDrvPort    port;
463 462
 	    ErlDrvPDL     q_mtx;
464 463
 	    size_t        size;
465  
-	    size_t        free_size;
466 464
 	    size_t        reply_size;
467 465
 	} writev;
468 466
 	struct t_pwritev pwritev;
@@ -1532,26 +1530,24 @@ static void invoke_writev(void *data) {
1532 1530
     }
1533 1531
     EF_FREE(iov);
1534 1532
 
1535  
-    d->c.writev.free_size = size;
1536  
-    d->c.writev.size -= size;
1537 1533
     if (! d->result_ok) {
1538 1534
 	d->again = 0;
  1535
+	MUTEX_LOCK(d->c.writev.q_mtx);
  1536
+	driver_deq(d->c.writev.port, d->c.writev.size);
  1537
+	MUTEX_UNLOCK(d->c.writev.q_mtx);
1539 1538
     } else {
1540 1539
 	if (! segment) {
1541 1540
 	    d->again = 0;
1542 1541
 	}
  1542
+	d->c.writev.size -= size;
1543 1543
 	TRACE_F(("w%lu", (unsigned long)size));
1544  
-
  1544
+	MUTEX_LOCK(d->c.writev.q_mtx);
  1545
+	driver_deq(d->c.writev.port, size);
  1546
+	MUTEX_UNLOCK(d->c.writev.q_mtx);
1545 1547
     }
1546  
-    DTRACE_INVOKE_RETURN(FILE_WRITE);
1547  
-}
1548 1548
 
1549  
-static void free_writev(void *data) {
1550  
-    struct t_data *d = data;
1551  
-    MUTEX_LOCK(d->c.writev.q_mtx);
1552  
-    driver_deq(d->c.writev.port, d->c.writev.size + d->c.writev.free_size);
1553  
-    MUTEX_UNLOCK(d->c.writev.q_mtx);
1554  
-    EF_FREE(d);
  1549
+
  1550
+    DTRACE_INVOKE_RETURN(FILE_WRITE);
1555 1551
 }
1556 1552
 
1557 1553
 static void invoke_pwd(void *data)
@@ -1602,7 +1598,7 @@ static void invoke_pwritev(void *data) {
1602 1598
     struct t_pwritev *c = &d->c.pwritev;
1603 1599
     size_t            p;
1604 1600
     int               segment;
1605  
-    size_t            size, write_size;
  1601
+    size_t            size, write_size, written;
1606 1602
     DTRACE_INVOKE_SETUP(FILE_PWRITEV);
1607 1603
 
1608 1604
     segment = d->again && c->size >= 2*FILE_SEGMENT_WRITE;
@@ -1622,23 +1618,19 @@ static void invoke_pwritev(void *data) {
1622 1618
 
1623 1619
     if (iovlen < 0)
1624 1620
 	goto error; /* Port terminated */
1625  
-    for (iovcnt = 0, c->free_size = 0;
1626  
-	 c->cnt < c->n && iovcnt < iovlen && c->free_size < size;
  1621
+    for (iovcnt = 0, written = 0;
  1622
+	 c->cnt < c->n && iovcnt < iovlen && written < size;
1627 1623
 	 c->cnt++) {
1628 1624
 	int chop;
1629 1625
 	write_size = c->specs[c->cnt].size;
1630 1626
 	if (iov[iovcnt].iov_len - p < write_size) {
1631  
-	    /* Mismatch between pos/size spec and what is queued */
1632  
-	    d->errInfo.posix_errno = EINVAL;
1633  
-	    d->result_ok = 0;
1634  
-	    d->again = 0;
1635  
-	    goto done;
  1627
+	    goto error;
1636 1628
 	}
1637  
-	chop = segment && c->free_size + write_size >= 2*FILE_SEGMENT_WRITE;
  1629
+	chop = segment && written + write_size >= 2*FILE_SEGMENT_WRITE;
1638 1630
 	if (chop) {
1639  
-	    ASSERT(c->free_size < FILE_SEGMENT_WRITE);
  1631
+	    ASSERT(written < FILE_SEGMENT_WRITE);
1640 1632
 	    write_size = FILE_SEGMENT_WRITE + FILE_SEGMENT_WRITE/2 
1641  
-		- c->free_size;
  1633
+		- written;
1642 1634
 	}
1643 1635
 	d->result_ok = efile_pwrite(&d->errInfo, (int) d->fd,
1644 1636
 				    (char *)(iov[iovcnt].iov_base) + p,
@@ -1646,15 +1638,15 @@ static void invoke_pwritev(void *data) {
1646 1638
 				    c->specs[c->cnt].offset);
1647 1639
 	if (! d->result_ok) {
1648 1640
 	    d->again = 0;
1649  
-	    goto done;
  1641
+	    goto deq_error;
1650 1642
 	}
1651  
-	c->free_size += write_size; 
  1643
+	written += write_size; 
1652 1644
 	c->size -= write_size;
1653 1645
 	if (chop) { 
1654 1646
 	    c->specs[c->cnt].offset += write_size;
1655 1647
 	    c->specs[c->cnt].size -= write_size;
1656 1648
 	    /* Schedule out (d->again != 0) */
1657  
-	    goto done;
  1649
+	    break;
1658 1650
 	}
1659 1651
 	/* Move forward in buffer */
1660 1652
 	p += write_size;
@@ -1676,25 +1668,28 @@ static void invoke_pwritev(void *data) {
1676 1668
 	    d->errInfo.posix_errno = EINVAL;
1677 1669
 	    d->result_ok = 0;
1678 1670
 	    d->again = 0;
  1671
+	deq_error:
  1672
+	    MUTEX_LOCK(d->c.writev.q_mtx);
  1673
+	    driver_deq(d->c.pwritev.port, c->size);
  1674
+	    MUTEX_UNLOCK(d->c.writev.q_mtx);
  1675
+
  1676
+	    goto done;
1679 1677
 	} else {
1680  
-	    ASSERT(c->free_size == size);
  1678
+	    ASSERT(written == size);
1681 1679
 	    d->again = 0;
1682 1680
 	}
1683  
-    }
  1681
+    } else
  1682
+      ASSERT(written == FILE_SEGMENT_WRITE);
  1683
+      
  1684
+    MUTEX_LOCK(d->c.writev.q_mtx);
  1685
+    driver_deq(d->c.pwritev.port, size);
  1686
+    MUTEX_UNLOCK(d->c.writev.q_mtx);
1684 1687
  done:
1685 1688
     EF_FREE(iov); /* Free our copy of the vector, nothing to restore */
  1689
+    
1686 1690
     DTRACE_INVOKE_RETURN(FILE_PWRITEV);
1687 1691
 }
1688 1692
 
1689  
-static void free_pwritev(void *data) {
1690  
-    struct t_data *d = data;
1691  
-
1692  
-    MUTEX_LOCK(d->c.writev.q_mtx);
1693  
-    driver_deq(d->c.pwritev.port, d->c.pwritev.free_size + d->c.pwritev.size);
1694  
-    MUTEX_UNLOCK(d->c.writev.q_mtx);
1695  
-    EF_FREE(d);
1696  
-}
1697  
-
1698 1693
 static void invoke_flstat(void *data)
1699 1694
 {
1700 1695
     struct t_data *d = (struct t_data *) data;
@@ -2008,21 +2003,8 @@ static void try_free_read_bin(file_descriptor *desc) {
2008 2003
 
2009 2004
 
2010 2005
 static int try_again(file_descriptor *desc, struct t_data *d) {
2011  
-    if (! d->again) {
  2006
+    if (! d->again)
2012 2007
 	return 0;
2013  
-    }
2014  
-    switch (d->command) {
2015  
-    case FILE_WRITE:
2016  
-	MUTEX_LOCK(d->c.writev.q_mtx);
2017  
-	driver_deq(d->c.writev.port, d->c.writev.free_size);
2018  
-	MUTEX_UNLOCK(d->c.writev.q_mtx);
2019  
-	break;
2020  
-    case FILE_PWRITEV:
2021  
-	MUTEX_LOCK(d->c.writev.q_mtx);
2022  
-	driver_deq(d->c.pwritev.port, d->c.pwritev.free_size);
2023  
-	MUTEX_UNLOCK(d->c.writev.q_mtx);
2024  
-	break;
2025  
-    }
2026 2008
     if (desc->timer_state != timer_idle) {
2027 2009
 	driver_cancel_timer(desc->port);
2028 2010
     }
@@ -2078,10 +2060,9 @@ static struct t_data *async_write(file_descriptor *desc, int *errp,
2078 2060
     }
2079 2061
 #endif
2080 2062
     d->reply = reply;
2081  
-    d->c.writev.free_size = 0;
2082 2063
     d->c.writev.reply_size = reply_size;
2083 2064
     d->invoke = invoke_writev;
2084  
-    d->free = free_writev;
  2065
+    d->free = free_data;
2085 2066
     d->level = 1;
2086 2067
     cq_enq(desc, d);
2087 2068
     desc->write_buffered = 0;
@@ -2394,7 +2375,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
2394 2375
 		  desc->write_errInfo = d->errInfo;
2395 2376
 	      }
2396 2377
 	  }
2397  
-	  free_writev(data);
  2378
+	  free_data(data);
2398 2379
 	  break;
2399 2380
       case FILE_LSEEK:
2400 2381
 	  if (d->reply) {
@@ -2524,7 +2505,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
2524 2505
 	  } else {
2525 2506
 	      reply_Uint(desc, d->c.pwritev.n);
2526 2507
 	  }
2527  
-	  free_pwritev(data);
  2508
+	  free_data(data);
2528 2509
 	  break;
2529 2510
       case FILE_PREADV:
2530 2511
 	  if (!d->result_ok) {
@@ -2581,7 +2562,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
2581 2562
     }
2582 2563
     DTRACE6(efile_drv_return, sched_i1, sched_i2, sched_utag,
2583 2564
             command, result_ok, posix_errno);
2584  
-    if (desc->write_buffered != 0 && desc->timer_state == timer_idle) {
  2565
+    if (desc->write_buffered != 0 && desc->timer_state == timer_idle ) {
2585 2566
 	desc->timer_state = timer_write;
2586 2567
 	driver_set_timer(desc->port, desc->write_delay);
2587 2568
     }
@@ -3651,7 +3632,6 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
3651 3632
 #ifdef USE_VM_PROBES
3652 3633
 	dt_i3 = d->c.pwritev.size;
3653 3634
 #endif
3654  
-	d->c.pwritev.free_size = 0;
3655 3635
 	if (j == 0) {
3656 3636
 	    /* Trivial case - nothing to write */
3657 3637
 	    EF_FREE(d);
@@ -3675,7 +3655,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
3675 3655
 		MUTEX_UNLOCK(desc->q_mtx);
3676 3656
 		/* Execute the command */
3677 3657
 		d->invoke = invoke_pwritev;
3678  
-		d->free = free_pwritev;
  3658
+		d->free = free_data;
3679 3659
 		d->level = 1;
3680 3660
 		cq_enq(desc, d);
3681 3661
 	    }

0 notes on commit e755c19

Please sign in to comment.
Something went wrong with that request. Please try again.