Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

merge new dht code

  • Loading branch information...
commit 961fd20039c15c6dcb4dbb3a9e75de0267a563fd 1 parent 42cc4fc
@isdal isdal authored
Showing with 9,868 additions and 3,452 deletions.
  1. +1 −1  .gitignore
  2. +64 −7 az_src/src/com/aelitis/azureus/core/dht/DHT.java
  3. +5 −3 az_src/src/com/aelitis/azureus/core/dht/DHTOperationAdapter.java
  4. +4 −2 az_src/src/com/aelitis/azureus/core/dht/DHTOperationListener.java
  5. +17 −2 az_src/src/com/aelitis/azureus/core/dht/DHTStorageAdapter.java
  6. +3 −0  az_src/src/com/aelitis/azureus/core/dht/DHTStorageKeyStats.java
  7. +62 −14 az_src/src/com/aelitis/azureus/core/dht/control/DHTControl.java
  8. +3 −1 az_src/src/com/aelitis/azureus/core/dht/control/DHTControlAdapter.java
  9. +3,034 −2,266 az_src/src/com/aelitis/azureus/core/dht/control/impl/DHTControlImpl.java
  10. +64 −4 az_src/src/com/aelitis/azureus/core/dht/control/impl/DHTControlStatsImpl.java
  11. +28 −3 az_src/src/com/aelitis/azureus/core/dht/db/DHTDB.java
  12. +2 −0  az_src/src/com/aelitis/azureus/core/dht/db/DHTDBFactory.java
  13. +9 −0 az_src/src/com/aelitis/azureus/core/dht/db/DHTDBStats.java
  14. +2,247 −297 az_src/src/com/aelitis/azureus/core/dht/db/impl/DHTDBImpl.java
  15. +302 −53 az_src/src/com/aelitis/azureus/core/dht/db/impl/DHTDBMapping.java
  16. +50 −7 az_src/src/com/aelitis/azureus/core/dht/db/impl/DHTDBValueImpl.java
  17. +58 −6 az_src/src/com/aelitis/azureus/core/dht/impl/DHTImpl.java
  18. +1 −1  az_src/src/com/aelitis/azureus/core/dht/impl/DHTLog.java
  19. +116 −19 az_src/src/com/aelitis/azureus/core/dht/nat/impl/DHTNATPuncherImpl.java
  20. +3 −0  az_src/src/com/aelitis/azureus/core/dht/netcoords/DHTNetworkPosition.java
  21. +34 −0 az_src/src/com/aelitis/azureus/core/dht/netcoords/DHTNetworkPositionListener.java
  22. +224 −36 az_src/src/com/aelitis/azureus/core/dht/netcoords/DHTNetworkPositionManager.java
  23. +34 −0 az_src/src/com/aelitis/azureus/core/dht/netcoords/DHTNetworkPositionProviderListener.java
  24. +3 −0  az_src/src/com/aelitis/azureus/core/dht/netcoords/vivaldi/ver1/Coordinates.java
  25. +4 −0 az_src/src/com/aelitis/azureus/core/dht/netcoords/vivaldi/ver1/impl/HeightCoordinatesImpl.java
  26. +5 −0 az_src/src/com/aelitis/azureus/core/dht/netcoords/vivaldi/ver1/impl/VivaldiPositionImpl.java
  27. +1 −1  az_src/src/com/aelitis/azureus/core/dht/netcoords/vivaldi/ver1/impl/tests/VivaldiVisualTest.java
  28. +6 −5 az_src/src/com/aelitis/azureus/core/dht/router/DHTRouter.java
  29. +6 −0 az_src/src/com/aelitis/azureus/core/dht/router/impl/DHTRouterContactImpl.java
  30. +152 −8 az_src/src/com/aelitis/azureus/core/dht/router/impl/DHTRouterImpl.java
  31. +13 −0 az_src/src/com/aelitis/azureus/core/dht/transport/DHTTransport.java
  32. +21 −1 az_src/src/com/aelitis/azureus/core/dht/transport/DHTTransportContact.java
  33. +1 −0  az_src/src/com/aelitis/azureus/core/dht/transport/DHTTransportException.java
  34. +15 −0 az_src/src/com/aelitis/azureus/core/dht/transport/DHTTransportFullStats.java
  35. +3 −0  az_src/src/com/aelitis/azureus/core/dht/transport/DHTTransportListener.java
  36. +34 −0 az_src/src/com/aelitis/azureus/core/dht/transport/DHTTransportQueryStoreReply.java
  37. +7 −0 az_src/src/com/aelitis/azureus/core/dht/transport/DHTTransportReplyHandler.java
  38. +10 −19 az_src/src/com/aelitis/azureus/core/dht/transport/DHTTransportReplyHandlerAdapter.java
  39. +8 −0 az_src/src/com/aelitis/azureus/core/dht/transport/DHTTransportRequestHandler.java
  40. +15 −0 az_src/src/com/aelitis/azureus/core/dht/transport/DHTTransportStats.java
  41. +12 −0 az_src/src/com/aelitis/azureus/core/dht/transport/DHTTransportValue.java
  42. +34 −2 az_src/src/com/aelitis/azureus/core/dht/transport/loopback/DHTTransportLoopbackContactImpl.java
  43. +39 −1 az_src/src/com/aelitis/azureus/core/dht/transport/loopback/DHTTransportLoopbackImpl.java
  44. +6 −0 az_src/src/com/aelitis/azureus/core/dht/transport/loopback/DHTTransportLoopbackStatsImpl.java
  45. +34 −4 az_src/src/com/aelitis/azureus/core/dht/transport/udp/DHTTransportUDP.java
  46. +34 −4 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTTransportUDPContactImpl.java
  47. +594 −35 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTTransportUDPImpl.java
  48. +11 −2 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTTransportUDPStatsImpl.java
  49. +28 −3 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketData.java
  50. +21 −7 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketHelper.java
  51. +14 −8 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketReply.java
  52. +3 −2 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketReplyError.java
  53. +4 −2 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketReplyFindNode.java
  54. +4 −2 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketReplyFindValue.java
  55. +3 −1 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketReplyKeyBlock.java
  56. +4 −2 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketReplyPing.java
  57. +193 −0 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketReplyQueryStorage.java
  58. +3 −1 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketReplyStats.java
  59. +3 −1 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketReplyStore.java
  60. +8 −5 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketRequest.java
  61. +44 −1 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketRequestFindNode.java
  62. +168 −0 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketRequestQueryStorage.java
  63. +3 −3 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPPacketRequestStore.java
  64. +211 −33 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/DHTUDPUtils.java
  65. +40 −21 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/packethandler/DHTUDPPacketHandler.java
  66. +1 −0  az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/packethandler/DHTUDPPacketHandlerException.java
  67. +5 −2 az_src/src/com/aelitis/azureus/core/dht/transport/udp/impl/packethandler/DHTUDPPacketNetworkHandler.java
  68. +15 −0 az_src/src/com/aelitis/azureus/core/dht/transport/util/DHTTransportRequestCounter.java
  69. +161 −11 az_src/src/com/aelitis/azureus/core/dht/transport/util/DHTTransportStatsImpl.java
  70. +851 −406 az_src/src/com/aelitis/azureus/plugins/dht/DHTPlugin.java
  71. +19 −0 az_src/src/com/aelitis/azureus/plugins/dht/DHTPluginContact.java
  72. +4 −0 az_src/src/com/aelitis/azureus/plugins/dht/DHTPluginOperationListener.java
  73. +3 −0  az_src/src/com/aelitis/azureus/plugins/dht/DHTPluginValue.java
  74. +58 −1 az_src/src/com/aelitis/azureus/plugins/dht/impl/DHTPluginContactImpl.java
  75. +220 −33 az_src/src/com/aelitis/azureus/plugins/dht/impl/DHTPluginImpl.java
  76. +337 −102 az_src/src/com/aelitis/azureus/plugins/dht/impl/DHTPluginStorageManager.java
  77. +7 −1 az_src/src/com/aelitis/azureus/plugins/dht/impl/DHTPluginValueImpl.java
View
2  .gitignore
@@ -25,7 +25,7 @@ error.log
.gwt/
# Run output
-plugins/
+./plugins/
speed_check.log
# Eclipse
View
71 az_src/src/com/aelitis/azureus/core/dht/DHT.java
@@ -30,6 +30,7 @@
import com.aelitis.azureus.core.dht.router.DHTRouter;
import com.aelitis.azureus.core.dht.speed.DHTSpeedTester;
import com.aelitis.azureus.core.dht.transport.DHTTransport;
+import com.aelitis.azureus.core.dht.transport.DHTTransportContact;
import com.aelitis.azureus.core.dht.transport.DHTTransportValue;
/**
@@ -51,14 +52,21 @@
public static final String PR_ORIGINAL_REPUBLISH_INTERVAL = "OriginalRepublishInterval";
public static final String PR_CACHE_REPUBLISH_INTERVAL = "CacheRepublishInterval";
- public static final byte FLAG_SINGLE_VALUE = 0x00;
- public static final byte FLAG_DOWNLOADING = 0x01;
- public static final byte FLAG_SEEDING = 0x02;
- public static final byte FLAG_MULTI_VALUE = 0x04;
- public static final byte FLAG_STATS = 0x08;
+ public static final byte FLAG_SINGLE_VALUE = 0x00;
+ public static final byte FLAG_DOWNLOADING = 0x01;
+ public static final byte FLAG_SEEDING = 0x02;
+ public static final byte FLAG_MULTI_VALUE = 0x04;
+ public static final byte FLAG_STATS = 0x08;
+ public static final byte FLAG_ANON = 0x10;
+ public static final byte FLAG_PRECIOUS = 0x20;
+ public static final byte FLAG_PUT_AND_FORGET = 0x40; // local only
+ public static final byte FLAG_OBFUSCATE_LOOKUP = (byte)0x80; // local only
- public static final int MAX_VALUE_SIZE = 256;
+ public static final int MAX_VALUE_SIZE = 512;
+ public static final byte REP_FACT_NONE = 0;
+ public static final byte REP_FACT_DEFAULT = (byte)0xff;
+
// diversification types, don't change as serialised!!!!
public static final byte DT_NONE = 1;
@@ -80,6 +88,47 @@
DHTOperationListener listener );
/**
+ * default is HIGH PRIORITY. if you change to low priority then do so consistently as
+ * operations can get out of order otherwise
+ * @param key
+ * @param description
+ * @param value
+ * @param flags
+ * @param high_priority
+ * @param listener
+ */
+
+ public void
+ put(
+ byte[] key,
+ String description,
+ byte[] value,
+ byte flags,
+ boolean high_priority,
+ DHTOperationListener listener );
+
+ public void
+ put(
+ byte[] key,
+ String description,
+ byte[] value,
+ byte flags,
+ byte life_hours,
+ boolean high_priority,
+ DHTOperationListener listener );
+
+ public void
+ put(
+ byte[] key,
+ String description,
+ byte[] value,
+ byte flags,
+ byte life_hours,
+ byte replication_control, // 4 bits 1->14 republish hours; 0=vuze default | 4 bits 0->15 maintain replicas; [ff=no replication control-use default]
+ boolean high_priority,
+ DHTOperationListener listener );
+
+ /**
* Returns value if originated from here for key
* @param key
* @return
@@ -115,6 +164,13 @@
String description,
DHTOperationListener listener );
+ public byte[]
+ remove(
+ DHTTransportContact[] contacts,
+ byte[] key,
+ String description,
+ DHTOperationListener listener );
+
public boolean
isDiversified(
byte[] key );
@@ -191,5 +247,6 @@
getLogger();
public void
- print();
+ print(
+ boolean full );
}
View
8 az_src/src/com/aelitis/azureus/core/dht/DHTOperationAdapter.java
@@ -43,13 +43,15 @@
}
public void
- diversified()
+ diversified(
+ String desc )
{
}
-
+
public void
found(
- DHTTransportContact contact )
+ DHTTransportContact contact,
+ boolean is_closest )
{
}
View
6 az_src/src/com/aelitis/azureus/core/dht/DHTOperationListener.java
@@ -40,11 +40,13 @@
int active_searches );
public void
- diversified();
+ diversified(
+ String desc );
public void
found(
- DHTTransportContact contact );
+ DHTTransportContact contact,
+ boolean is_closest );
public void
read(
View
19 az_src/src/com/aelitis/azureus/core/dht/DHTStorageAdapter.java
@@ -38,6 +38,9 @@
public interface
DHTStorageAdapter
{
+ public int
+ getNetwork();
+
// local value operations
/**
@@ -91,15 +94,18 @@
getExistingDiversification(
byte[] key,
boolean put_operation,
- boolean exhaustive_get );
+ boolean exhaustive_get,
+ int max_depth );
public byte[][]
createNewDiversification(
+ String description,
DHTTransportContact cause,
byte[] key,
boolean put_operation,
byte diversification_type,
- boolean exhaustive_get );
+ boolean exhaustive_get,
+ int max_depth );
public int
getNextValueVersions(
@@ -130,4 +136,13 @@
public byte[]
getStorageForKey(
String key );
+
+ public int
+ getRemoteFreqDivCount();
+
+ public int
+ getRemoteSizeDivCount();
+
+ public int
+ getKeyCount();
}
View
3  az_src/src/com/aelitis/azureus/core/dht/DHTStorageKeyStats.java
@@ -36,4 +36,7 @@
public byte
getDiversification();
+
+ public String
+ getString();
}
View
76 az_src/src/com/aelitis/azureus/core/dht/control/DHTControl.java
@@ -53,12 +53,18 @@
seed(
boolean full_wait );
+ public boolean
+ isSeeded();
+
public void
put(
byte[] key,
String description,
byte[] value,
byte flags,
+ byte life_hours,
+ byte replication_control,
+ boolean high_priority,
DHTOperationListener listener );
public boolean
@@ -86,6 +92,13 @@
String description,
DHTOperationListener listener );
+ public byte[]
+ remove(
+ DHTTransportContact[] contacts,
+ byte[] key,
+ String description,
+ DHTOperationListener listener );
+
public DHTControlStats
getStats();
@@ -116,14 +129,16 @@
// support methods for DB
- public List
+ public List<DHTTransportContact>
getClosestKContactsList(
byte[] id,
boolean live_only );
- public List
- sortContactsByDistance(
- List contacts );
+ public List<DHTTransportContact>
+ getClosestContactsList(
+ byte[] id,
+ int num_to_return,
+ boolean live_only );
public void
putEncodedKey(
@@ -135,10 +150,18 @@
public void
putDirectEncodedKeys(
- byte[][] keys,
- String description,
- DHTTransportValue[][] value_sets,
- List contacts );
+ byte[][] keys,
+ String description,
+ DHTTransportValue[][] value_sets,
+ List<DHTTransportContact> contacts );
+
+ public void
+ putDirectEncodedKeys(
+ byte[][] keys,
+ String description,
+ DHTTransportValue[][] value_sets,
+ DHTTransportContact contact,
+ DHTOperationListener listener );
public int
computeAndCompareDistances(
@@ -146,6 +169,16 @@
byte[] n2,
byte[] pivot );
+ public byte[]
+ computeDistance(
+ byte[] n1,
+ byte[] n2 );
+
+ public int
+ compareDistances(
+ byte[] n1,
+ byte[] n2 );
+
public boolean
verifyContact(
DHTTransportContact c,
@@ -154,17 +187,31 @@
public boolean
lookup(
byte[] id,
+ String description,
long timeout,
DHTOperationListener listener );
- /**
- * Returns a list of DHTContact objects
- * @return
- */
+ public boolean
+ lookupEncoded(
+ byte[] id,
+ String description,
+ long timeout,
+ boolean high_priority,
+ DHTOperationListener listener );
+
+ public byte[]
+ getObfuscatedKey(
+ byte[] plain_key );
- public List
+
+ public List<DHTControlContact>
getContacts();
+ // debug method only
+
+ public void
+ pingAll();
+
public void
addListener(
DHTControlListener l );
@@ -174,5 +221,6 @@
DHTControlListener l );
public void
- print();
+ print(
+ boolean full );
}
View
4 az_src/src/com/aelitis/azureus/core/dht/control/DHTControlAdapter.java
@@ -38,12 +38,14 @@
public byte[][]
diversify(
+ String description,
DHTTransportContact cause,
boolean put_operation,
boolean existing,
byte[] key,
byte type,
- boolean exhaustive );
+ boolean exhaustive,
+ int max_depth );
public boolean
isDiversified(
View
5,300 az_src/src/com/aelitis/azureus/core/dht/control/impl/DHTControlImpl.java
3,034 additions, 2,266 deletions not shown
View
68 az_src/src/com/aelitis/azureus/core/dht/control/impl/DHTControlStatsImpl.java
@@ -51,7 +51,8 @@
private DHTTransportStats transport_snapshot;
private long[] router_snapshot;
-
+ private int[] value_details_snapshot;
+
protected
DHTControlStatsImpl(
DHTControlImpl _control )
@@ -100,6 +101,8 @@
transport_snapshot = t_stats;
router_snapshot = control.getRouter().getStats().getStats();
+
+ value_details_snapshot = null;
}
public long
@@ -186,10 +189,39 @@
}
// DB
+ protected int[]
+ getValueDetails()
+ {
+ int[] vd = value_details_snapshot;
+
+ if ( vd == null ){
+
+ vd = control.getDataBase().getStats().getValueDetails();
+
+ value_details_snapshot = vd;
+ }
+
+ return( vd );
+ }
+
public long
getDBValuesStored()
{
- return( control.getDataBase().getStats().getValueDetails()[ DHTDBStats.VD_VALUE_COUNT ]);
+ int[] vd = getValueDetails();
+
+ return( vd[ DHTDBStats.VD_VALUE_COUNT ]);
+ }
+
+ public long
+ getDBKeyCount()
+ {
+ return( control.getDataBase().getStats().getKeyCount());
+ }
+
+ public long
+ getDBValueCount()
+ {
+ return( control.getDataBase().getStats().getValueCount());
}
public long
@@ -197,6 +229,29 @@
{
return( control.getDataBase().getStats().getKeyBlockCount());
}
+
+ public long
+ getDBKeyDivSizeCount()
+ {
+ int[] vd = getValueDetails();
+
+ return( vd[ DHTDBStats.VD_DIV_SIZE ]);
+ }
+
+ public long
+ getDBKeyDivFreqCount()
+ {
+ int[] vd = getValueDetails();
+
+ return( vd[ DHTDBStats.VD_DIV_FREQ ]);
+ }
+
+ public long
+ getDBStoreSize()
+ {
+ return( control.getDataBase().getStats().getSize());
+ }
+
// Router
public long
@@ -264,8 +319,13 @@
getRouterLeaves() + "," +
getRouterContacts() +
",database:" +
- getDBValuesStored()+ ","+
- getDBKeysBlocked()+
+ getDBKeyCount() + ","+
+ getDBValueCount() + ","+
+ getDBValuesStored() + ","+
+ getDBStoreSize() + ","+
+ getDBKeyDivFreqCount() + ","+
+ getDBKeyDivSizeCount() + ","+
+ getDBKeysBlocked()+
",version:" + getVersion()+","+
getRouterUptime() + ","+
getRouterCount());
View
31 az_src/src/com/aelitis/azureus/core/dht/db/DHTDB.java
@@ -23,12 +23,14 @@
package com.aelitis.azureus.core.dht.db;
import java.util.Iterator;
+import java.util.List;
import org.gudy.azureus2.core3.util.HashWrapper;
import com.aelitis.azureus.core.dht.DHTStorageBlock;
import com.aelitis.azureus.core.dht.control.DHTControl;
import com.aelitis.azureus.core.dht.transport.DHTTransportContact;
+import com.aelitis.azureus.core.dht.transport.DHTTransportQueryStoreReply;
import com.aelitis.azureus.core.dht.transport.DHTTransportValue;
/**
@@ -55,7 +57,9 @@
store(
HashWrapper key,
byte[] value,
- byte flags );
+ byte flags,
+ byte life_hours,
+ byte replication_control );
/**
* Remote store
@@ -72,6 +76,12 @@
HashWrapper key,
DHTTransportValue[] values );
+ public DHTTransportQueryStoreReply
+ queryStore(
+ DHTTransportContact originating_contact,
+ int header_len,
+ List<Object[]> keys );
+
/**
* Internal lookup for locally originated values
* @param key
@@ -82,6 +92,20 @@
get(
HashWrapper key );
+ /**
+ * Returns a value for the given key (local or remote) if found
+ * @param key
+ * @return
+ */
+
+ public DHTDBValue
+ getAnyValue(
+ HashWrapper key );
+
+ public boolean
+ hasKey(
+ HashWrapper key );
+
public DHTDBLookupResult
get(
DHTTransportContact reader,
@@ -128,12 +152,13 @@
* @return
*/
- public Iterator
+ public Iterator<HashWrapper>
getKeys();
public DHTDBStats
getStats();
public void
- print();
+ print(
+ boolean full );
}
View
2  az_src/src/com/aelitis/azureus/core/dht/db/DHTDBFactory.java
@@ -39,12 +39,14 @@
DHTStorageAdapter adapter,
int original_republish_interval,
int cache_republish_interval,
+ byte protocol_version,
DHTLogger logger )
{
return( new DHTDBImpl(
adapter,
original_republish_interval,
cache_republish_interval,
+ protocol_version,
logger ));
}
}
View
9 az_src/src/com/aelitis/azureus/core/dht/db/DHTDBStats.java
@@ -36,8 +36,17 @@
getKeyCount();
public int
+ getLocalKeyCount();
+
+ public int
getKeyBlockCount();
+ public int
+ getSize();
+
+ public int
+ getValueCount();
+
/**
* returned values indexed by above VD_ constants for meaning
* @return
View
2,544 az_src/src/com/aelitis/azureus/core/dht/db/impl/DHTDBImpl.java
@@ -30,8 +30,12 @@
import org.gudy.azureus2.core3.ipfilter.IpFilterManagerFactory;
import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.AESemaphore;
-import org.gudy.azureus2.core3.util.AEThread;
+import org.gudy.azureus2.core3.util.AEThread2;
+import org.gudy.azureus2.core3.util.ByteArrayHashMap;
+import org.gudy.azureus2.core3.util.ByteFormatter;
+import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.HashWrapper;
+import org.gudy.azureus2.core3.util.RandomUtils;
import org.gudy.azureus2.core3.util.SimpleTimer;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.core3.util.TimerEvent;
@@ -40,6 +44,7 @@
import com.aelitis.azureus.core.dht.DHT;
import com.aelitis.azureus.core.dht.DHTLogger;
+import com.aelitis.azureus.core.dht.DHTOperationAdapter;
import com.aelitis.azureus.core.dht.DHTStorageAdapter;
import com.aelitis.azureus.core.dht.DHTStorageBlock;
import com.aelitis.azureus.core.dht.DHTStorageKey;
@@ -49,9 +54,11 @@
import com.aelitis.azureus.core.dht.router.DHTRouter;
import com.aelitis.azureus.core.dht.control.DHTControl;
import com.aelitis.azureus.core.dht.transport.DHTTransportContact;
+import com.aelitis.azureus.core.dht.transport.DHTTransportQueryStoreReply;
import com.aelitis.azureus.core.dht.transport.DHTTransportReplyHandlerAdapter;
import com.aelitis.azureus.core.dht.transport.DHTTransportValue;
import com.aelitis.azureus.core.dht.transport.udp.DHTTransportUDP;
+import com.aelitis.azureus.core.util.FeatureAvailability;
import com.aelitis.azureus.core.util.bloom.BloomFilter;
import com.aelitis.azureus.core.util.bloom.BloomFilterFactory;
@@ -64,16 +71,22 @@
DHTDBImpl
implements DHTDB, DHTDBStats
{
+ private static final int MAX_VALUE_LIFETIME = 3*24*60*60*1000;
+
+
private int original_republish_interval;
// the grace period gives the originator time to republish their data as this could involve
// some work on their behalf to find closest nodes etc. There's no real urgency here anyway
- private int ORIGINAL_REPUBLISH_INTERVAL_GRACE = 60*60*1000;
+ public static int ORIGINAL_REPUBLISH_INTERVAL_GRACE = 60*60*1000;
+
+ private static final boolean ENABLE_PRECIOUS_STUFF = false;
+ private static final int PRECIOUS_CHECK_INTERVAL = 2*60*60*1000;
private int cache_republish_interval;
- private long MIN_CACHE_EXPIRY_CHECK_INTERVAL = 60000;
+ private long MIN_CACHE_EXPIRY_CHECK_INTERVAL = 60*1000;
private long last_cache_expiry_check;
private static final long IP_BLOOM_FILTER_REBUILD_PERIOD = 15*60*1000;
@@ -86,7 +99,11 @@
private int next_value_version_left;
- private Map stored_values = new HashMap();
+ protected static final int QUERY_STORE_REQUEST_ENTRY_SIZE = 6;
+ protected static final int QUERY_STORE_REPLY_ENTRY_SIZE = 2;
+
+ private Map<HashWrapper,DHTDBMapping> stored_values = new HashMap<HashWrapper,DHTDBMapping>();
+ private Map<DHTDBMapping.ShortHash,DHTDBMapping> stored_values_prefix_map = new HashMap<DHTDBMapping.ShortHash,DHTDBMapping>();
private DHTControl control;
private DHTStorageAdapter adapter;
@@ -94,12 +111,13 @@
private DHTTransportContact local_contact;
private DHTLogger logger;
- // PIAMOD -- save memory by reducing this from 4 -> 1
- private static final long MAX_TOTAL_SIZE = 1*1024*1024;
+ private static final long MAX_TOTAL_SIZE = 4*1024*1024;
+
+ private int total_size;
+ private int total_values;
+ private int total_keys;
+ private int total_local_keys;
- private long total_size;
- private long total_values;
- private long total_keys;
private boolean force_original_republish;
@@ -107,23 +125,75 @@
private AEMonitor this_mon = new AEMonitor( "DHTDB" );
+ private static final boolean DEBUG_SURVEY = false;
+ private static final boolean SURVEY_ONLY_RF_KEYS = true;
+
+
+ private static final int SURVEY_PERIOD = DEBUG_SURVEY?1*60*1000:15*60*1000;
+ private static final int SURVEY_STATE_INACT_TIMEOUT = DEBUG_SURVEY?5*60*1000:60*60*1000;
+ private static final int SURVEY_STATE_MAX_LIFE_TIMEOUT = 3*60*60*1000 + 30*60*1000;
+ private static final int SURVEY_STATE_MAX_LIFE_RAND = 1*60*60*1000;
+
+ private static final int MAX_SURVEY_SIZE = 100;
+ private static final int MAX_SURVEY_STATE_SIZE = 150;
+
+ private final boolean survey_enabled;
+
+ private volatile boolean survey_in_progress;
+
+ private Map<HashWrapper,Long> survey_mapping_times = new HashMap<HashWrapper, Long>();
+
+ private Map<HashWrapper,SurveyContactState> survey_state =
+ new LinkedHashMap<HashWrapper,SurveyContactState>(MAX_SURVEY_STATE_SIZE,0.75f,true)
+ {
+ protected boolean
+ removeEldestEntry(
+ Map.Entry<HashWrapper,SurveyContactState> eldest)
+ {
+ return size() > MAX_SURVEY_STATE_SIZE;
+ }
+ };
+
public
DHTDBImpl(
DHTStorageAdapter _adapter,
int _original_republish_interval,
int _cache_republish_interval,
+ byte _protocol_version,
DHTLogger _logger )
{
adapter = _adapter==null?null:new adapterFacade( _adapter );
original_republish_interval = _original_republish_interval;
cache_republish_interval = _cache_republish_interval;
logger = _logger;
-
+
+ survey_enabled =
+ _protocol_version >= DHTTransportUDP.PROTOCOL_VERSION_REPLICATION_CONTROL3 &&
+ ( adapter == null ||
+ adapter.getNetwork() == DHT.NW_CVS ||
+ FeatureAvailability.isDHTRepV2Enabled());
+ if ( ENABLE_PRECIOUS_STUFF ){
+
+ SimpleTimer.addPeriodicEvent(
+ "DHTDB:precious",
+ PRECIOUS_CHECK_INTERVAL/4,
+ true, // absolute, we don't want effective time changes (computer suspend/resume) to shift these
+ new TimerEventPerformer()
+ {
+ public void
+ perform(
+ TimerEvent event )
+ {
+ checkPreciousStuff();
+ }
+ });
+ }
SimpleTimer.addPeriodicEvent(
"DHTDB:op",
original_republish_interval,
+ true, // absolute, we don't want effective time changes (computer suspend/resume) to shift these
new TimerEventPerformer()
{
public void
@@ -150,6 +220,7 @@
SimpleTimer.addPeriodicEvent(
"DHTDB:cp",
cache_republish_interval + 10000 - (int)(Math.random()*20000),
+ true, // absolute, we don't want effective time changes (computer suspend/resume) to shift these
new TimerEventPerformer()
{
public void
@@ -207,7 +278,23 @@
}
}
});
-
+
+ if ( survey_enabled ){
+
+ SimpleTimer.addPeriodicEvent(
+ "DHTDB:survey",
+ SURVEY_PERIOD,
+ true,
+ new TimerEventPerformer()
+ {
+ public void
+ perform(
+ TimerEvent event )
+ {
+ survey();
+ }
+ });
+ }
}
@@ -229,11 +316,13 @@
try{
this_mon.enter();
- Iterator it = stored_values.values().iterator();
+ survey_state.clear();
+
+ Iterator<DHTDBMapping> it = stored_values.values().iterator();
while( it.hasNext()){
- DHTDBMapping mapping = (DHTDBMapping)it.next();
+ DHTDBMapping mapping = it.next();
mapping.updateLocalContact( local_contact );
}
@@ -247,24 +336,70 @@
store(
HashWrapper key,
byte[] value,
- byte flags )
+ byte flags,
+ byte life_hours,
+ byte replication_control )
{
+
// local store
- try{
- this_mon.enter();
+ if ( (flags & DHT.FLAG_PUT_AND_FORGET ) == 0 ){
+
+ if (( flags & DHT.FLAG_OBFUSCATE_LOOKUP ) != 0 ){
- // don't police max check for locally stored data
- // only that received
+ Debug.out( "Obfuscated puts without 'put-and-forget' are not supported as original-republishing of them is not implemented" );
+ }
- DHTDBMapping mapping = (DHTDBMapping)stored_values.get( key );
+ if ( life_hours > 0 ){
+
+ if ( life_hours*60*60*1000 < original_republish_interval ){
+
+ Debug.out( "Don't put persistent values with a lifetime less than republish period - lifetime over-ridden" );
+
+ life_hours = 0;
+ }
+ }
- if ( mapping == null ){
+ try{
+ this_mon.enter();
+
+ total_local_keys++;
- mapping = new DHTDBMapping( this, key, true );
+ // don't police max check for locally stored data
+ // only that received
- stored_values.put( key, mapping );
+ DHTDBMapping mapping = (DHTDBMapping)stored_values.get( key );
+
+ if ( mapping == null ){
+
+ mapping = new DHTDBMapping( this, key, true );
+
+ stored_values.put( key, mapping );
+
+ addToPrefixMap( mapping );
+ }
+
+ DHTDBValueImpl res =
+ new DHTDBValueImpl(
+ SystemTime.getCurrentTime(),
+ value,
+ getNextValueVersion(),
+ local_contact,
+ local_contact,
+ true,
+ flags,
+ life_hours,
+ replication_control );
+
+ mapping.add( res );
+
+ return( res );
+
+ }finally{
+
+ this_mon.exit();
}
+ }else{
DHTDBValueImpl res =
new DHTDBValueImpl(
@@ -274,18 +409,26 @@
local_contact,
local_contact,
true,
- flags );
-
- mapping.add( res );
+ flags,
+ life_hours,
+ replication_control );
return( res );
-
- }finally{
-
- this_mon.exit();
}
}
+ /*
+ private long store_ops;
+ private long store_ops_bad1;
+ private long store_ops_bad2;
+
+ private void
+ logStoreOps()
+ {
+ System.out.println( "sops (" + control.getTransport().getNetwork() + ")=" + store_ops + ",bad1=" + store_ops_bad1 + ",bad2=" + store_ops_bad2 );
+ }
+ */
+
public byte
store(
DHTTransportContact sender,
@@ -302,75 +445,7 @@
return( DHT.DT_SIZE );
}
- // remote store for cache values
-
- // Make sure that we only accept values for storing that are reasonable.
- // Assumption is that the caller has made a reasonable effort to ascertain
- // the correct place to store a value. Part of this will in general have
- // needed them to query us for example. Therefore, limit values to those
- // that are at least as close to us
-
- List closest_contacts = control.getClosestKContactsList( key.getHash(), true );
-
- boolean store_it = false;
-
- for (int i=0;i<closest_contacts.size();i++){
-
- if ( router.isID(((DHTTransportContact)closest_contacts.get(i)).getID())){
-
- store_it = true;
-
- break;
- }
- }
-
- if ( !store_it ){
-
- DHTLog.log( "Not storing " + DHTLog.getString2(key.getHash()) + " as key too far away" );
-
- return( DHT.DT_NONE );
- }
-
- // next, for cache forwards (rather then values coming directly from
- // originators) we ensure that the contact sending the values to us is
- // close enough. If any values are coming indirect then we can safely assume
- // that they all are
-
- boolean cache_forward = false;
-
- for (int i=0;i<values.length;i++){
-
- if (!Arrays.equals( sender.getID(), values[i].getOriginator().getID())){
-
- cache_forward = true;
-
- break;
- }
- }
-
-
- if ( cache_forward ){
-
- // get the closest contacts to me
-
- byte[] my_id = local_contact.getID();
-
- closest_contacts = control.getClosestKContactsList( my_id, true );
-
- DHTTransportContact furthest = (DHTTransportContact)closest_contacts.get( closest_contacts.size()-1);
-
- if ( control.computeAndCompareDistances( furthest.getID(), sender.getID(), my_id ) < 0 ){
-
- store_it = false;
- }
- }
-
- if ( !store_it ){
-
- DHTLog.log( "Not storing " + DHTLog.getString2(key.getHash()) + " as cache forward and sender too far away" );
-
- return( DHT.DT_NONE );
- }
+ // logStoreOps();
try{
this_mon.enter();
@@ -384,49 +459,20 @@
mapping = new DHTDBMapping( this, key, false );
stored_values.put( key, mapping );
+
+ addToPrefixMap( mapping );
}
-
- boolean contact_checked = false;
- boolean contact_ok = false;
-
+
// we carry on an update as its ok to replace existing entries
// even if diversified
for (int i=0;i<values.length;i++){
- DHTTransportValue t_value = values[i];
-
- // last check, verify that the contact is who they say they are, only for non-forwards
- // as cache forwards are only accepted if they are "close enough" and we can't
- // rely on their identify due to the way that cache republish works (it doesn't
- // guarantee a "lookup_node" prior to "store".
-
DHTTransportValue value = values[i];
-
- boolean ok_to_store = false;
-
- boolean direct =Arrays.equals( sender.getID(), value.getOriginator().getID());
-
- if ( !contact_checked ){
-
- contact_ok = control.verifyContact( sender, direct );
-
- if ( !contact_ok ){
-
- logger.log( "DB: verification of contact '" + sender.getName() + "' failed for store operation" );
- }
-
- contact_checked = true;
- }
-
- ok_to_store = contact_ok;
-
- if ( ok_to_store ){
- DHTDBValueImpl mapping_value = new DHTDBValueImpl( sender, value, false );
+ DHTDBValueImpl mapping_value = new DHTDBValueImpl( sender, value, false );
- mapping.add( mapping_value );
- }
+ mapping.add( mapping_value );
}
return( mapping.getDiversificationType());
@@ -490,7 +536,7 @@
get(
HashWrapper key )
{
- // local remove
+ // local get
try{
this_mon.enter();
@@ -511,6 +557,43 @@
}
public DHTDBValue
+ getAnyValue(
+ HashWrapper key )
+ {
+ try{
+ this_mon.enter();
+
+ DHTDBMapping mapping = (DHTDBMapping)stored_values.get( key );
+
+ if ( mapping != null ){
+
+ return( mapping.getAnyValue( local_contact ));
+ }
+
+ return( null );
+
+ }finally{
+
+ this_mon.exit();
+ }
+ }
+
+ public boolean
+ hasKey(
+ HashWrapper key )
+ {
+ try{
+ this_mon.enter();
+
+ return( stored_values.containsKey( key ));
+
+ }finally{
+
+ this_mon.exit();
+ }
+ }
+
+ public DHTDBValue
remove(
DHTTransportContact originator,
HashWrapper key )
@@ -527,6 +610,17 @@
DHTDBValueImpl res = mapping.remove( originator );
if ( res != null ){
+
+ total_local_keys--;
+
+ if ( !mapping.getValues().hasNext()){
+
+ stored_values.remove( key );
+
+ removeFromPrefixMap( mapping );
+
+ mapping.destroy();
+ }
return( res.getValueForDeletion( getNextValueVersion()));
}
@@ -560,13 +654,13 @@
byte[] key = adapter.getKeyForKeyBlock( request );
- List closest_contacts = control.getClosestKContactsList( key, true );
+ List<DHTTransportContact> closest_contacts = control.getClosestKContactsList( key, true );
boolean process_it = false;
for (int i=0;i<closest_contacts.size();i++){
- if ( router.isID(((DHTTransportContact)closest_contacts.get(i)).getID())){
+ if ( router.isID(closest_contacts.get(i).getID())){
process_it = true;
@@ -634,6 +728,24 @@
return( (int)total_keys );
}
+ public int
+ getLocalKeyCount()
+ {
+ return( total_local_keys );
+ }
+
+ public int
+ getValueCount()
+ {
+ return( (int)total_values );
+ }
+
+ public int
+ getSize()
+ {
+ return( (int)total_size );
+ }
+
public int[]
getValueDetails()
{
@@ -642,11 +754,11 @@
int[] res = new int[6];
- Iterator it = stored_values.values().iterator();
+ Iterator<DHTDBMapping> it = stored_values.values().iterator();
while( it.hasNext()){
- DHTDBMapping mapping = (DHTDBMapping)it.next();
+ DHTDBMapping mapping = it.next();
res[DHTDBStats.VD_VALUE_COUNT] += mapping.getValueCount();
res[DHTDBStats.VD_LOCAL_SIZE] += mapping.getLocalSize();
@@ -662,6 +774,19 @@
}else if ( dt == DHT.DT_SIZE ){
res[DHTDBStats.VD_DIV_SIZE]++;
+
+ /*
+ Iterator<DHTDBValueImpl> it2 = mapping.getIndirectValues();
+
+ System.out.println( "values=" + mapping.getValueCount());
+
+ while( it2.hasNext()){
+
+ DHTDBValueImpl val = it2.next();
+
+ System.out.println( new String( val.getValue()) + " - " + val.getOriginator().getAddress());
+ }
+ */
}
}
@@ -684,13 +809,13 @@
return( adapter.getDirectKeyBlocks().length );
}
- public Iterator
+ public Iterator<HashWrapper>
getKeys()
{
try{
this_mon.enter();
- return( new ArrayList( stored_values.keySet()).iterator());
+ return( new ArrayList<HashWrapper>( stored_values.keySet()).iterator());
}finally{
@@ -703,28 +828,28 @@
{
int values_published = 0;
- Map republish = new HashMap();
+ Map<HashWrapper,List<DHTDBValueImpl>> republish = new HashMap<HashWrapper,List<DHTDBValueImpl>>();
try{
this_mon.enter();
- Iterator it = stored_values.entrySet().iterator();
+ Iterator<Map.Entry<HashWrapper,DHTDBMapping>> it = stored_values.entrySet().iterator();
while( it.hasNext()){
- Map.Entry entry = (Map.Entry)it.next();
+ Map.Entry<HashWrapper,DHTDBMapping> entry = it.next();
HashWrapper key = (HashWrapper)entry.getKey();
DHTDBMapping mapping = (DHTDBMapping)entry.getValue();
- Iterator it2 = mapping.getValues();
+ Iterator<DHTDBValueImpl> it2 = mapping.getValues();
- List values = new ArrayList();
+ List<DHTDBValueImpl> values = new ArrayList<DHTDBValueImpl>();
while( it2.hasNext()){
- DHTDBValueImpl value = (DHTDBValueImpl)it2.next();
+ DHTDBValueImpl value = it2.next();
if ( value != null && value.isLocal()){
@@ -747,15 +872,15 @@
this_mon.exit();
}
- Iterator it = republish.entrySet().iterator();
+ Iterator<Map.Entry<HashWrapper,List<DHTDBValueImpl>>> it = republish.entrySet().iterator();
while( it.hasNext()){
- Map.Entry entry = (Map.Entry)it.next();
+ Map.Entry<HashWrapper,List<DHTDBValueImpl>> entry = it.next();
HashWrapper key = (HashWrapper)entry.getKey();
- List values = (List)entry.getValue();
+ List<DHTDBValueImpl> values = entry.getValue();
// no point in worry about multi-value puts here as it is extremely unlikely that
// > 1 value will locally stored, or > 1 value will go to the same contact
@@ -764,7 +889,7 @@
values_published++;
- control.putEncodedKey( key.getHash(), "Republish", (DHTDBValueImpl)values.get(i), 0, true );
+ control.putEncodedKey( key.getHash(), "Republish", values.get(i), 0, true );
}
}
@@ -779,7 +904,9 @@
router.refreshIdleLeaves( cache_republish_interval );
- final Map republish = new HashMap();
+ final Map<HashWrapper,List<DHTDBValueImpl>> republish = new HashMap<HashWrapper,List<DHTDBValueImpl>>();
+
+ List<DHTDBMapping> republish_via_survey = new ArrayList<DHTDBMapping>();
long now = System.currentTimeMillis();
@@ -788,15 +915,15 @@
checkCacheExpiration( true );
- Iterator it = stored_values.entrySet().iterator();
+ Iterator<Map.Entry<HashWrapper,DHTDBMapping>> it = stored_values.entrySet().iterator();
while( it.hasNext()){
- Map.Entry entry = (Map.Entry)it.next();
+ Map.Entry<HashWrapper,DHTDBMapping> entry = it.next();
- HashWrapper key = (HashWrapper)entry.getKey();
+ HashWrapper key = entry.getKey();
- DHTDBMapping mapping = (DHTDBMapping)entry.getValue();
+ DHTDBMapping mapping = entry.getValue();
// assume that if we've diversified then the other k-1 locations are under similar
// stress and will have done likewise - no point in republishing cache values to them
@@ -808,15 +935,26 @@
continue;
}
- Iterator it2 = mapping.getValues();
+ Iterator<DHTDBValueImpl> it2 = mapping.getValues();
- List values = new ArrayList();
+ boolean all_rf_values = it2.hasNext();
+
+ List<DHTDBValueImpl> values = new ArrayList<DHTDBValueImpl>();
while( it2.hasNext()){
- DHTDBValueImpl value = (DHTDBValueImpl)it2.next();
+ DHTDBValueImpl value = it2.next();
- if ( !value.isLocal()){
+ if ( value.isLocal()){
+
+ all_rf_values = false;
+
+ }else{
+
+ if ( value.getReplicationFactor() == DHT.REP_FACT_DEFAULT ){
+
+ all_rf_values = false;
+ }
// if this value was stored < period ago then we assume that it was
// also stored to the other k-1 locations at the same time and therefore
@@ -839,10 +977,19 @@
}
}
- if ( values.size() > 0 ){
+ if ( all_rf_values ){
- republish.put( key, values );
+ // if surveying is disabled then we swallow values here to prevent them
+ // from being replicated using the existing technique and muddying the waters
+ values.clear(); // handled by the survey process
+
+ republish_via_survey.add( mapping );
+ }
+
+ if ( values.size() > 0 ){
+
+ republish.put( key, values );
}
}
}finally{
@@ -850,11 +997,71 @@
this_mon.exit();
}
+ if ( republish_via_survey.size() > 0 ){
+
+ // we still check for being too far away here
+
+ List<HashWrapper> stop_caching = new ArrayList<HashWrapper>();
+
+ for ( DHTDBMapping mapping: republish_via_survey ){
+
+ HashWrapper key = mapping.getKey();
+
+ byte[] lookup_id = key.getHash();
+
+ List<DHTTransportContact> contacts = control.getClosestKContactsList( lookup_id, false );
+
+ // if we are no longer one of the K closest contacts then we shouldn't
+ // cache the value
+
+ boolean keep_caching = false;
+
+ for (int j=0;j<contacts.size();j++){
+
+ if ( router.isID(((DHTTransportContact)contacts.get(j)).getID())){
+
+ keep_caching = true;
+
+ break;
+ }
+ }
+
+ if ( !keep_caching ){
+
+ DHTLog.log( "Dropping cache entry for " + DHTLog.getString( lookup_id ) + " as now too far away" );
+
+ stop_caching.add( key );
+ }
+ }
+
+ if ( stop_caching.size() > 0 ){
+
+ try{
+ this_mon.enter();
+
+ for (int i=0;i<stop_caching.size();i++){
+
+ DHTDBMapping mapping = (DHTDBMapping)stored_values.remove( stop_caching.get(i));
+
+ if ( mapping != null ){
+
+ removeFromPrefixMap( mapping );
+
+ mapping.destroy();
+ }
+ }
+ }finally{
+
+ this_mon.exit();
+ }
+ }
+ }
+
final int[] values_published = {0};
final int[] keys_published = {0};
final int[] republish_ops = {0};
- final HashSet anti_spoof_done = new HashSet();
+ final HashSet<DHTTransportContact> anti_spoof_done = new HashSet<DHTTransportContact>();
if ( republish.size() > 0 ){
@@ -868,19 +1075,19 @@
// (that's required to keep the DHT alive in general) to ensure that all
// k-buckets are reasonably up-to-date
- Iterator it = republish.entrySet().iterator();
+ Iterator<Map.Entry<HashWrapper,List<DHTDBValueImpl>>> it1 = republish.entrySet().iterator();
- List stop_caching = new ArrayList();
+ List<HashWrapper> stop_caching = new ArrayList<HashWrapper>();
// build a map of contact -> list of keys to republish
- Map contact_map = new HashMap();
+ Map<HashWrapper,Object[]> contact_map = new HashMap<HashWrapper,Object[]>();
- while( it.hasNext()){
+ while( it1.hasNext()){
- Map.Entry entry = (Map.Entry)it.next();
+ Map.Entry<HashWrapper,List<DHTDBValueImpl>> entry = it1.next();
- HashWrapper key = (HashWrapper)entry.getKey();
+ HashWrapper key = entry.getKey();
byte[] lookup_id = key.getHash();
@@ -889,7 +1096,7 @@
// is a bad idea as failures may rack up against the live ones due
// to network problems and kill them, leaving the dead ones!
- List contacts = control.getClosestKContactsList( lookup_id, false );
+ List<DHTTransportContact> contacts = control.getClosestKContactsList( lookup_id, false );
// if we are no longer one of the K closest contacts then we shouldn't
// cache the value
@@ -929,20 +1136,20 @@
if ( data == null ){
- data = new Object[]{ contact, new ArrayList()};
+ data = new Object[]{ contact, new ArrayList<HashWrapper>()};
contact_map.put( new HashWrapper(contact.getID()), data );
}
- ((List)data[1]).add( key );
+ ((List<HashWrapper>)data[1]).add( key );
}
}
- it = contact_map.values().iterator();
+ Iterator<Object[]> it2 = contact_map.values().iterator();
- while( it.hasNext()){
+ while( it2.hasNext()){
- final Object[] data = (Object[])it.next();
+ final Object[] data = it2.next();
final DHTTransportContact contact = (DHTTransportContact)data[0];
@@ -964,7 +1171,7 @@
try{
// System.out.println( "cacheForward: pre-store findNode OK" );
- List keys = (List)data[1];
+ List<HashWrapper> keys = (List<HashWrapper>)data[1];
byte[][] store_keys = new byte[keys.size()][];
DHTTransportValue[][] store_values = new DHTTransportValue[store_keys.length][];
@@ -973,11 +1180,11 @@
for (int i=0;i<store_keys.length;i++){
- HashWrapper wrapper = (HashWrapper)keys.get(i);
+ HashWrapper wrapper = keys.get(i);
store_keys[i] = wrapper.getHash();
- List values = (List)republish.get( wrapper );
+ List<DHTDBValueImpl> values = republish.get( wrapper );
store_values[i] = new DHTTransportValue[values.size()];
@@ -985,7 +1192,7 @@
for (int j=0;j<values.size();j++){
- DHTDBValueImpl value = (DHTDBValueImpl)values.get(j);
+ DHTDBValueImpl value = values.get(j);
// we reduce the cache distance by 1 here as it is incremented by the
// recipients
@@ -994,7 +1201,7 @@
}
}
- List contacts = new ArrayList();
+ List<DHTTransportContact> contacts = new ArrayList<DHTTransportContact>();
contacts.add( contact );
@@ -1043,6 +1250,8 @@
if ( mapping != null ){
+ removeFromPrefixMap( mapping );
+
mapping.destroy();
}
}
@@ -1087,7 +1296,12 @@
continue;
}
- if ( contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_BLOCK_KEYS ){
+ if ( router.isID( contact.getID())){
+
+ continue; // ignore ourselves
+ }
+
+ if ( contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_BLOCK_KEYS ){
final Runnable task =
new Runnable()
@@ -1150,121 +1364,1625 @@
},
contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_ANTI_SPOOF2?new byte[0]:new byte[20] );
}
- }
- }
+ }
+ }
+ }
+ }
+
+ return( new int[]{ values_published[0], keys_published[0], republish_ops[0] });
+ }
+
+ protected void
+ checkCacheExpiration(
+ boolean force )
+ {
+ long now = SystemTime.getCurrentTime();
+
+ if ( !force ){
+
+ long elapsed = now - last_cache_expiry_check;
+
+ if ( elapsed > 0 && elapsed < MIN_CACHE_EXPIRY_CHECK_INTERVAL ){
+
+ return;
+ }
+ }
+
+ try{
+ this_mon.enter();
+
+ last_cache_expiry_check = now;
+
+ Iterator<DHTDBMapping> it = stored_values.values().iterator();
+
+ while( it.hasNext()){
+
+ DHTDBMapping mapping = it.next();
+
+ if ( mapping.getValueCount() == 0 ){
+
+ it.remove();
+
+ removeFromPrefixMap( mapping );
+
+ mapping.destroy();
+
+ }else{
+
+ Iterator<DHTDBValueImpl> it2 = mapping.getValues();
+
+ while( it2.hasNext()){
+
+ DHTDBValueImpl value = it2.next();
+
+ if ( !value.isLocal()){
+
+ // distance 1 = initial store location. We use the initial creation date
+ // when deciding whether or not to remove this, plus a bit, as the
+ // original publisher is supposed to republish these
+
+ int life_hours = value.getLifeTimeHours();
+
+ int max_age;
+
+ if ( life_hours < 1 ){
+
+ max_age = original_republish_interval;
+
+ }else{
+
+ max_age = life_hours * 60*60*1000;
+
+ if ( max_age > MAX_VALUE_LIFETIME ){
+
+ max_age = MAX_VALUE_LIFETIME;
+ }
+ }
+
+ int grace;
+
+ if (( value.getFlags() & DHT.FLAG_PUT_AND_FORGET ) != 0 ){
+
+ grace = 0;
+
+ }else{
+
+ // scale the grace period for short lifetimes
+
+ grace = Math.min( ORIGINAL_REPUBLISH_INTERVAL_GRACE, max_age/4 );
+ }
+
+ if ( now > value.getCreationTime() + max_age + grace ){
+
+ DHTLog.log( "removing cache entry (" + value.getString() + ")" );
+
+ it2.remove();
+ }
+ }
+ }
+ }
+ }
+ }finally{
+
+ this_mon.exit();
+ }
+ }
+
+ protected void
+ addToPrefixMap(
+ DHTDBMapping mapping )
+ {
+ DHTDBMapping.ShortHash key = mapping.getShortKey();
+
+ DHTDBMapping existing = stored_values_prefix_map.get( key );
+
+ // possible to have clashes, be consistent in which one we use to avoid
+ // confusing other nodes
+
+ if ( existing != null ){
+
+ byte[] existing_full = existing.getKey().getBytes();
+ byte[] new_full = mapping.getKey().getBytes();
+
+ if ( control.computeAndCompareDistances( existing_full, new_full, local_contact.getID()) < 0 ){
+
+ return;
+ }
+ }
+
+ stored_values_prefix_map.put( key, mapping );
+
+ if ( stored_values_prefix_map.size() > stored_values.size()){
+
+ Debug.out( "inconsistent" );
+ }
+ }
+
+ protected void
+ removeFromPrefixMap(
+ DHTDBMapping mapping )
+ {
+ DHTDBMapping.ShortHash key = mapping.getShortKey();
+
+ DHTDBMapping existing = stored_values_prefix_map.get( key );
+
+ if ( existing == mapping ){
+
+ stored_values_prefix_map.remove( key );
+ }
+ }
+
+ protected void
+ checkPreciousStuff()
+ {
+ long now = SystemTime.getCurrentTime();
+
+ Map<HashWrapper,List<DHTDBValueImpl>> republish = new HashMap<HashWrapper,List<DHTDBValueImpl>>();
+
+ try{
+
+ this_mon.enter();
+
+ Iterator<Map.Entry<HashWrapper,DHTDBMapping>> it = stored_values.entrySet().iterator();
+
+ while( it.hasNext()){
+
+ Map.Entry<HashWrapper,DHTDBMapping> entry = it.next();
+
+ HashWrapper key = entry.getKey();
+
+ DHTDBMapping mapping = entry.getValue();
+
+ Iterator<DHTDBValueImpl> it2 = mapping.getValues();
+
+ List<DHTDBValueImpl> values = new ArrayList<DHTDBValueImpl>();
+
+ while( it2.hasNext()){
+
+ DHTDBValueImpl value = it2.next();
+
+ if ( value.isLocal()){
+
+ if (( value.getFlags() | DHT.FLAG_PRECIOUS ) != 0 ){
+
+ if ( now - value.getCreationTime() > PRECIOUS_CHECK_INTERVAL ){
+
+ value.setCreationTime();
+
+ values.add( value );
+ }
+ }
+ }
+ }
+
+ if ( values.size() > 0 ){
+
+ republish.put( key, values );
+
+ }
+ }
+ }finally{
+
+ this_mon.exit();
+ }
+
+ Iterator<Map.Entry<HashWrapper,List<DHTDBValueImpl>>> it = republish.entrySet().iterator();
+
+ while( it.hasNext()){
+
+ Map.Entry<HashWrapper,List<DHTDBValueImpl>> entry = it.next();
+
+ HashWrapper key = entry.getKey();
+
+ List<DHTDBValueImpl> values = entry.getValue();
+
+ // no point in worry about multi-value puts here as it is extremely unlikely that
+ // > 1 value will locally stored, or > 1 value will go to the same contact
+
+ for (int i=0;i<values.size();i++){
+
+ control.putEncodedKey( key.getHash(), "Precious republish", values.get(i), 0, true );
+ }
+ }
+ }
+
+ protected DHTTransportContact
+ getLocalContact()
+ {
+ return( local_contact );
+ }
+
+ protected DHTStorageAdapter
+ getAdapter()
+ {
+ return( adapter );
+ }
+
+ protected void
+ log(
+ String str )
+ {
+ logger.log( str );
+ }
+
+ public DHTDBStats
+ getStats()
+ {
+ return( this );
+ }
+
+ protected void
+ survey()
+ {
+ if ( survey_in_progress ){
+
+ return;
+ }
+
+ if ( DEBUG_SURVEY ){
+ System.out.println( "surveying" );
+ }
+
+ checkCacheExpiration( false );
+
+ final byte[] my_id = router.getID();
+
+ if ( DEBUG_SURVEY ){
+ System.out.println( " my_id=" + ByteFormatter.encodeString( my_id ));
+ }
+
+ final ByteArrayHashMap<DHTTransportContact> id_map = new ByteArrayHashMap<DHTTransportContact>();
+
+ List<DHTTransportContact> all_contacts = control.getClosestContactsList( my_id, router.getK()*3, true );
+
+ for ( DHTTransportContact contact: all_contacts ){
+
+ id_map.put( contact.getID(), contact );
+ }
+
+ byte[] max_key = my_id;
+ byte[] max_dist = null;
+
+ final List<HashWrapper> applicable_keys = new ArrayList<HashWrapper>();
+
+ try{
+ this_mon.enter();
+
+ long now = SystemTime.getMonotonousTime();
+
+ Iterator<SurveyContactState> s_it = survey_state.values().iterator();
+
+ while( s_it.hasNext()){
+
+ if ( s_it.next().timeout( now )){
+
+ s_it.remove();
+ }
+ }
+
+ Iterator<DHTDBMapping> it = stored_values.values().iterator();
+
+ Set<HashWrapper> existing_times = new HashSet<HashWrapper>( survey_mapping_times.keySet());
+
+ while( it.hasNext()){
+
+ DHTDBMapping mapping = it.next();
+
+ HashWrapper hw = mapping.getKey();
+
+ if ( existing_times.size() > 0 ){
+
+ existing_times.remove( hw );
+ }
+
+ if ( !applyRF( mapping )){
+
+ continue;
+ }
+
+ applicable_keys.add( hw );
+
+ byte[] key = hw.getBytes();
+
+ /*
+ List<DHTTransportContact> contacts = control.getClosestKContactsList( key, true );
+
+ for ( DHTTransportContact c: contacts ){
+
+ id_map.put( c.getID(), c );
+ }
+ */
+
+ byte[] distance = control.computeDistance( my_id, key );
+
+ if ( max_dist == null || control.compareDistances( distance, max_dist ) > 0 ){
+
+ max_dist = distance;
+ max_key = key;
+ }
+ }
+
+ // remove dead mappings
+
+ for ( HashWrapper hw: existing_times ){
+
+ survey_mapping_times.remove( hw );
+ }
+
+ logger.log( "Survey starts: state size=" + survey_state.size() + ", all keys=" + stored_values.size() + ", applicable keys=" + applicable_keys.size());
+
+ }finally{
+
+ this_mon.exit();
+ }
+
+ if ( DEBUG_SURVEY ){
+ System.out.println( " max_key=" + ByteFormatter.encodeString( max_key ) + ", dist=" + ByteFormatter.encodeString( max_dist ) + ", initial_contacts=" + id_map.size());
+ }
+
+ if ( max_key == my_id ){
+
+ logger.log( "Survey complete - no applicable values" );
+
+ return;
+ }
+
+ // obscure key so we don't leak any keys
+
+ byte[] obscured_key = control.getObfuscatedKey( max_key );
+
+ final int[] requery_count = { 0 };
+
+ final boolean[] processing = { false };
+
+ try{
+ survey_in_progress = true;
+
+ control.lookupEncoded(
+ obscured_key,
+ "Neighbourhood survey: basic",
+ 0,
+ true,
+ new DHTOperationAdapter()
+ {
+ private List<DHTTransportContact> contacts = new ArrayList<DHTTransportContact>();
+
+ private boolean survey_complete;
+
+ public void
+ found(
+ DHTTransportContact contact,
+ boolean is_closest )
+ {
+ if ( is_closest ){
+
+ synchronized( contacts ){
+
+ if ( !survey_complete ){
+
+ contacts.add( contact );
+ }
+ }
+ }
+ }
+
+ public void
+ complete(
+ boolean timeout )
+ {
+ boolean requeried = false;
+
+ try{
+ int hits = 0;
+ int misses = 0;
+
+ // find the closest miss to us and recursively search
+
+ byte[] min_dist = null;
+ byte[] min_id = null;
+
+ synchronized( contacts ){
+
+ for ( DHTTransportContact c: contacts ){
+
+ byte[] id = c.getID();
+
+ if ( id_map.containsKey( id )){
+
+ hits++;
+
+ }else{
+
+ misses++;
+
+ if ( id_map.size() >= MAX_SURVEY_SIZE ){
+
+ log( "Max survery size exceeded" );
+
+ break;
+ }
+
+ id_map.put( id, c );
+
+ byte[] distance = control.computeDistance( my_id, id );
+
+ if ( min_dist == null || control.compareDistances( distance, min_dist ) < 0 ){
+
+ min_dist = distance;
+ min_id = id;
+ }
+ }
+ }
+
+ contacts.clear();
+ }
+
+ // if significant misses then re-query
+
+ if ( misses > 0 && misses*100/(hits+misses) >= 25 && id_map.size()< MAX_SURVEY_SIZE ){
+
+ if ( requery_count[0]++ < 5 ){
+
+ if ( DEBUG_SURVEY ){
+ System.out.println( "requery at " + ByteFormatter.encodeString( min_id ));
+ }
+
+ // don't need to obscure here as its a node-id
+
+ control.lookupEncoded(
+ min_id,
+ "Neighbourhood survey: level=" + requery_count[0],
+ 0,
+ true,
+ this );
+
+ requeried = true;
+
+ }else{
+
+ if ( DEBUG_SURVEY ){
+ System.out.println( "requery limit exceeded" );
+ }
+ }
+ }else{
+
+ if ( DEBUG_SURVEY ){
+ System.out.println( "super-neighbourhood=" + id_map.size() + " (hits=" + hits + ", misses=" + misses + ", level=" + requery_count[0] + ")" );
+ }
+ }
+ }finally{
+
+ if ( !requeried ){
+
+ synchronized( contacts ){
+
+ survey_complete = true;
+ }
+
+ if ( DEBUG_SURVEY ){
+ System.out.println( "survey complete: nodes=" + id_map.size());
+ }
+
+ processSurvey( my_id, applicable_keys, id_map );
+
+ processing[0] = true;
+ }
+ }
+ }
+ });
+
+ }catch( Throwable e ){
+
+ if ( !processing[0] ){
+
+ logger.log( "Survey complete - no applicable nodes" );
+
+ survey_in_progress = false;
+ }
+ }
+ }
+
+ protected void
+ processSurvey(
+ byte[] survey_my_id,
+ List<HashWrapper> applicable_keys,
+ ByteArrayHashMap<DHTTransportContact> survey )
+ {
+ boolean went_async = false;
+
+ try{
+ byte[][] node_ids = new byte[survey.size()][];
+
+ int pos = 0;
+
+ for ( byte[] id: survey.keys()){
+
+ node_ids[pos++] = id;
+ }
+
+ ByteArrayHashMap<List<DHTDBMapping>> value_map = new ByteArrayHashMap<List<DHTDBMapping>>();
+
+ Map<DHTTransportContact,ByteArrayHashMap<List<DHTDBMapping>>> request_map = new HashMap<DHTTransportContact, ByteArrayHashMap<List<DHTDBMapping>>>();
+
+ Map<DHTDBMapping,List<DHTTransportContact>> mapping_to_node_map = new HashMap<DHTDBMapping, List<DHTTransportContact>>();
+
+ int max_nodes = Math.min( node_ids.length, router.getK());
+
+ try{
+ this_mon.enter();
+
+ Iterator<HashWrapper> it = applicable_keys.iterator();
+
+ int value_count = 0;
+
+ while( it.hasNext()){
+
+ DHTDBMapping mapping = stored_values.get( it.next());
+
+ if ( mapping == null ){
+
+ continue;
+ }
+
+ value_count++;
+
+ final byte[] key = mapping.getKey().getBytes();
+
+ // find closest nodes to this key in order to asses availability
+
+ Arrays.sort(
+ node_ids,
+ new Comparator<byte[]>()
+ {
+ public int
+ compare(
+ byte[] o1,
+ byte[] o2 )
+ {
+ return( control.computeAndCompareDistances( o1, o2, key ));
+ }
+ });
+
+ boolean found_myself = false;
+
+ for ( int i=0;i<max_nodes;i++ ){
+
+ byte[] id = node_ids[i];
+
+ if ( Arrays.equals( survey_my_id, id )){
+
+ found_myself = true;
+
+ break;
+ }
+ }
+
+ // if we're not in the closest set to this key then ignore it
+
+ if ( !found_myself ){
+
+ if ( DEBUG_SURVEY ){
+ System.out.println( "we're not in closest set for " + ByteFormatter.encodeString( key ) + " - ignoring" );
+ }
+
+ continue;
+ }
+
+ List<DHTTransportContact> node_list = new ArrayList<DHTTransportContact>(max_nodes);
+
+ mapping_to_node_map.put( mapping, node_list );
+
+ for ( int i=0;i<max_nodes;i++ ){