Skip to content

Commit

Permalink
Adding example of using map / reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
ayende committed Mar 6, 2010
0 parents commit 0c25800
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

/bin/
/obj/
10 changes: 10 additions & 0 deletions IStorage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Collections.Generic;

namespace MapReduce
{
public interface IStorage
{
void Store(int batchId, object value, int level);
IEnumerable<IEnumerable<object>> GetBatchesFor(int level);
}
}
16 changes: 16 additions & 0 deletions LinqExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System.Collections.Generic;
using System.Linq;

namespace MapReduce
{
public static class LinqExtensions
{
public static IEnumerable<IEnumerable<T>> Partition<T>(this IEnumerable<T> instance, int partitionSize)
{
return instance
.Select((value, index) => new {Index = index, Value = value})
.GroupBy(i => i.Index/partitionSize)
.Select(i => i.Select(i2 => i2.Value));
}
}
}
61 changes: 61 additions & 0 deletions MapReduce.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">x86</Platform>
<ProductVersion>8.0.30703</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{188598A0-EA82-4AAA-A91A-D0B5EFEB3D6D}</ProjectGuid>
<OutputType>Exe</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>MapReduce</RootNamespace>
<AssemblyName>MapReduce</AssemblyName>
<TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
<TargetFrameworkProfile>Client</TargetFrameworkProfile>
<FileAlignment>512</FileAlignment>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' ">
<PlatformTarget>x86</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x86' ">
<PlatformTarget>x86</PlatformTarget>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="IStorage.cs" />
<Compile Include="LinqExtensions.cs" />
<Compile Include="MapReduceTask.cs" />
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Storage.cs" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>
45 changes: 45 additions & 0 deletions MapReduceTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System.Collections.Generic;
using System.Linq;

namespace MapReduce
{
public class MapReduceTask
{
public int BatchSize;
public IndexingFunc Map;
public IndexingFunc Reduce;
public IEnumerable<object> Source;
public IStorage Storage;

public void Execute()
{
var batchId = 0;
foreach (var batch in Source.Partition(BatchSize))
{
batchId++;
foreach (var mappedResult in Reduce(Map(batch)))
{
Storage.Store(batchId, mappedResult, 0);
}
}

var level = 0;
int numOfBatchesInLevel;
do
{
numOfBatchesInLevel = 0;
batchId = 0;
foreach (var batchOfBatches in Storage.GetBatchesFor(level).Partition(BatchSize))
{
numOfBatchesInLevel += 1;
foreach (var reducedResult in Reduce(batchOfBatches.SelectMany(objects => objects)))
{
Storage.Store(batchId, reducedResult, level + 1);
}
batchId++;
}
level += 1;
} while (numOfBatchesInLevel > 1);
}
}
}
60 changes: 60 additions & 0 deletions Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;
using System.Linq;

namespace MapReduce
{
public delegate IEnumerable<dynamic> IndexingFunc(IEnumerable<dynamic> source);

internal class Program
{
private static void Main()
{
var source = new[]
{
new {blog_id = 3, comments = 3},
new {blog_id = 5, comments = 4},
new {blog_id = 6, comments = 6},
new {blog_id = 7, comments = 1},
new {blog_id = 3, comments = 3},
new {blog_id = 3, comments = 5},
new {blog_id = 2, comments = 8},
new {blog_id = 4, comments = 3},
new {blog_id = 5, comments = 2},
new {blog_id = 3, comments = 3},
new {blog_id = 5, comments = 1}
};

var storageService = new Storage();
var mapReduceTask = new MapReduceTask
{
Map = docs => from post in docs
select new
{
post.blog_id,
comments_length = post.comments
},
Reduce = results => from agg in results
group agg by agg.blog_id
into g
select new
{
blog_id = g.Key,
comments_length = g.Sum(x => x.comments_length)
},
BatchSize = 3,
Storage = storageService,
Source = source
};
mapReduceTask.Execute();

foreach (var batch in storageService.GetBatchesFor(2))
{
foreach (var b in batch)
{
Console.WriteLine(b);
}
}
}
}
}
39 changes: 39 additions & 0 deletions Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System.Reflection;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.

[assembly: AssemblyTitle("MapReduce")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Microsoft")]
[assembly: AssemblyProduct("MapReduce")]
[assembly: AssemblyCopyright("Copyright © Microsoft 2010")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.

[assembly: ComVisible(false)]

// The following GUID is for the ID of the typelib if this project is exposed to COM

[assembly: Guid("ad83a30d-06ae-4ee0-b3ea-6a549d7eb104")]

// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]

[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
36 changes: 36 additions & 0 deletions Storage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;

namespace MapReduce
{
public class Storage : IStorage
{
private readonly ConcurrentDictionary<string, List<object>> resultsByBatchId =
new ConcurrentDictionary<string, List<object>>();

private readonly ConcurrentDictionary<int, HashSet<string>> resultsByLevel =
new ConcurrentDictionary<int, HashSet<string>>();

#region IStorageService Members

public void Store(int batchId, object value, int level)
{
var bag = resultsByBatchId.GetOrAdd(level + "." + batchId, guid => new List<object>());
var batches = resultsByLevel.GetOrAdd(level, new HashSet<string>());
batches.Add(level + "." + batchId);
bag.Add(value);
}

public IEnumerable<IEnumerable<object>> GetBatchesFor(int level)
{
HashSet<string> batchesIds;
if (resultsByLevel.TryGetValue(level, out batchesIds) == false)
throw new InvalidOperationException("Could not find level: " + level);
return batchesIds.Select(batchId => resultsByBatchId[batchId]);
}

#endregion
}
}

0 comments on commit 0c25800

Please sign in to comment.