# Matrices en 3 colonnes

Représentation d'une matrice avec Spark / Map / Reduce.

In [12]:
from jyquickhelper import add_notebook_menu
add_notebook_menu()

Ce notebook propose d'implémenter un produit matriciel sous Spark. Spark comme SQL n'aime pas trop avoir un nombre de colonnes variables. La première étape consiste à transformer les matrices $I\times J$ en tableau de trois colonnes $(i,j,coefficient)$.

## Création d'une matrice aléatoire

In [13]:
from numpy.random import rand
rnd1 = rand(10,10)
rnd2 = rand(10, 2)
rnd1 @ rnd2

array([[ 3.27789828,  2.856186  ],
       [ 2.71398152,  2.52376866],
       [ 3.75095714,  3.28095367],
       [ 2.90449967,  2.06579867],
       [ 2.88392467,  1.47480051],
       [ 4.37044889,  3.2399857 ],
       [ 3.28907774,  2.76350336],
       [ 3.16622761,  2.98022915],
       [ 4.16003216,  3.90609813],
       [ 3.6105953 ,  2.67888509]])

In [14]:
import pandas
df1 = pandas.DataFrame(rnd1)
df2 = pandas.DataFrame(rnd2)
df2

Unnamed: 0,0,1
0,0.895028,0.166102
1,0.112418,0.045979
2,0.812159,0.90169
3,0.194307,0.932745
4,0.642411,0.049303
5,0.380525,0.948708
6,0.727785,0.268102
7,0.915483,0.719258
8,0.72896,0.825541
9,0.968547,0.393226


In [15]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=False)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=False)

## Conversion d'une matrice au format Spark

Lorsqu'un traitement est distribué en Map/Reduce, il n'est pas possible de s'appuyer sur l'ordre dans lequel sont traitées les lignes. Le plus est d'ajouter cette information sur chaque ligne plutôt que de chercher à la récupérer.

In [16]:
df1.to_csv("rnd1.txt", sep="\t", header=None, index=True)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=True)

In [17]:
def process_mat_row(row):
    values = row.split("\t")
    index = int(values[0])
    values = [float(_) for _ in values[1:]]
    return [[index, j, v] for j, v in enumerate(values)]

In [21]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [23]:
mat1 = sc.csvFile("rnd1.csv")
new_mat1 = mat1.flatMap(process_mat_row)
new_mat1.take(12)

AttributeError: 'SparkContext' object has no attribute 'csvFile'

In [10]:
mat2 = sc.textFile("rnd2.txt")
new_mat2 = mat2.flatMap(process_mat_row)
new_mat2.take(12)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): java.io.IOException: Cannot run program "anaconda3/bin/python": error=2, Aucun fichier ou dossier de ce type
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, Aucun fichier ou dossier de ce type
	at java.lang.UNIXProcess.forkAndExec(Native Method)
	at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
	at java.lang.ProcessImpl.start(ProcessImpl.java:134)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
	... 14 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:446)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot run program "anaconda3/bin/python": error=2, Aucun fichier ou dossier de ce type
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.IOException: error=2, Aucun fichier ou dossier de ce type
	at java.lang.UNIXProcess.forkAndExec(Native Method)
	at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
	at java.lang.ProcessImpl.start(ProcessImpl.java:134)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
	... 14 more


## Produit matriciel

Il faut d'abord faire la jointure avec la méthode [join](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.join). Il faut que la clé soit sur la première colonne.

In [11]:
def key_ij(row):
    return row[0], (row[1], row[2])
def key_ji(row):
    return row[1], (row[0], row[2])
mat_join = new_mat1.map(key_ji).join(new_mat2.map(key_ij))
mat_join.take(12)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, localhost, executor driver): java.io.IOException: Cannot run program "anaconda3/bin/python": error=2, Aucun fichier ou dossier de ce type
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:395)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, Aucun fichier ou dossier de ce type
	at java.lang.UNIXProcess.forkAndExec(Native Method)
	at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
	at java.lang.ProcessImpl.start(ProcessImpl.java:134)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
	... 24 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:446)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot run program "anaconda3/bin/python": error=2, Aucun fichier ou dossier de ce type
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:395)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.IOException: error=2, Aucun fichier ou dossier de ce type
	at java.lang.UNIXProcess.forkAndExec(Native Method)
	at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
	at java.lang.ProcessImpl.start(ProcessImpl.java:134)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
	... 24 more


On effectue le produit matriciel.

In [10]:
def produit_matriciel(row):
    index, ((i, v1), (j, v2)) = row
    return i, j, v1 * v2
produit = mat_join.map(produit_matriciel)
produit.take(12)

[(0, 0, 0.7695998816642311),
 (0, 1, 0.4141321695398561),
 (1, 0, 0.716492946729951),
 (1, 1, 0.3855546051379675),
 (2, 0, 0.7959399119238495),
 (2, 1, 0.4283060982748405),
 (3, 0, 0.5584706635767238),
 (3, 1, 0.30052066410308675),
 (4, 0, 0.5765473124557496),
 (4, 1, 0.31024795486369955),
 (5, 0, 0.5452839068856777),
 (5, 1, 0.29342469087366296)]

Il ne reste plus qu'à agréger [reduceByKey](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey). La documentation fournit un exemple facilement transposable. Elle indique aussi : *Merge the values for each key using an associative and commutative reduce function.* Pourquoi précise-t-elle **associative et commutative** ? Cela signifie que le résultat ne dépend pas de l'ordre dans lequel l'agrégation est réalisée et qu'on peut commencer à agréger sans attendre d'avoir regroupé toutes les valeurs associées à une clé.

* *Cas 1 :* [groupBy](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupBy) + agrégation qui commence une fois les valeurs regroupées
* *Cas 2 :* [reduceByKey](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) + agrégation qui commence dès les premières valeurs regroupées

Le cas 2 est moins consommateur en terme de données. Le cas 1 n'est possible que si les valeurs agrégées ne sont pas trop nombreuses. Ca tombe bien, dans notre cas, le cas 2 convient.

In [11]:
from operator import add
final = produit.map(lambda row: ((row[0], row[1]), row[2])).reduceByKey(add)
aslist = final.collect()
aslist.sort()
aslist

[((0, 0), 3.164850046348241),
 ((0, 1), 4.559445715024405),
 ((1, 0), 2.526790300879841),
 ((1, 1), 2.8702356426731646),
 ((2, 0), 2.5456884797140247),
 ((2, 1), 3.04477187797909),
 ((3, 0), 1.9795588879982224),
 ((3, 1), 2.623457980006711),
 ((4, 0), 2.6288364080082656),
 ((4, 1), 4.111449492587058),
 ((5, 0), 2.788268947579333),
 ((5, 1), 3.498463270496026),
 ((6, 0), 2.2139520348118236),
 ((6, 1), 2.903554407097735),
 ((7, 0), 2.232501586646612),
 ((7, 1), 3.213223607913268),
 ((8, 0), 2.42321286851472),
 ((8, 1), 3.4138285975924623),
 ((9, 0), 2.1386203274215574),
 ((9, 1), 3.3888181357124005)]

Résultat initial :

In [12]:
rnd1 @ rnd2

array([[ 3.16485005,  4.55944572],
       [ 2.5267903 ,  2.87023564],
       [ 2.54568848,  3.04477188],
       [ 1.97955889,  2.62345798],
       [ 2.62883641,  4.11144949],
       [ 2.78826895,  3.49846327],
       [ 2.21395203,  2.90355441],
       [ 2.23250159,  3.21322361],
       [ 2.42321287,  3.4138286 ],
       [ 2.13862033,  3.38881814]])

## Même algorithme avec les Spark DataFrame

On a besoin de réaliser un [flatMap](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.flatMap). Une façon de faire est de créer des colonnes qui sont de type composé : un tableau, une structure. La multiplication des lignes est obtenue avec la fonction [explode](https://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.functions.explode).

In [13]:
schema = ["index"] + ["c%d" % i for i in range(1, 11)]
mat1 = spark.createDataFrame(pandas.read_csv("rnd1.txt", header=None, sep="\t"), schema=schema)

In [14]:
mat1.printSchema()

root
 |-- index: long (nullable = true)
 |-- c1: double (nullable = true)
 |-- c2: double (nullable = true)
 |-- c3: double (nullable = true)
 |-- c4: double (nullable = true)
 |-- c5: double (nullable = true)
 |-- c6: double (nullable = true)
 |-- c7: double (nullable = true)
 |-- c8: double (nullable = true)
 |-- c9: double (nullable = true)
 |-- c10: double (nullable = true)



In [15]:
schema = ["index"] + ["c%d" % i for i in range(1, 3)]
mat2 = spark.createDataFrame(pandas.read_csv("rnd2.txt", header=None, sep="\t"), schema=schema)

In [16]:
mat2.printSchema()

root
 |-- index: long (nullable = true)
 |-- c1: double (nullable = true)
 |-- c2: double (nullable = true)



Nous allons avoir besoin de quelques-uns des fonctions et types suivant :

* [explode](https://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.functions.explode), [posexplode](https://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.functions.posexplode), [array](https://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.functions.array), [alias](https://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.alias)
* [StructType](https://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.types.StructType), [StructField](https://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.types.StructField)
* [ArrayType](https://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.types.ArrayType)
* [DoubleType](https://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.types.DoubleType), [IntegerType](https://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.types.IntegerType)

Je recommande le type [FloatType](https://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.types.FloatType) qui prend deux fois moins de place pour une précision moindre mais suffisante dans la plupart des cas.

In [17]:
from pyspark.sql.types import ArrayType, StructField, StructType, DoubleType, IntegerType
from pyspark.sql.functions import explode, posexplode, array
from pyspark.sql import Row

In [18]:
cols = ["c%d" % i for i in range(1, 11)]
mat1_array = mat1.select(mat1.index, array(*cols).alias("x"))
mat1_array.printSchema()

root
 |-- index: long (nullable = true)
 |-- x: array (nullable = false)
 |    |-- element: double (containsNull = true)



In [19]:
mat1_exploded = mat1_array.select("index", posexplode("x"))
mat1_exploded.printSchema()

root
 |-- index: long (nullable = true)
 |-- pos: integer (nullable = false)
 |-- col: double (nullable = true)



In [20]:
mat1.toPandas().shape, mat1_exploded.toPandas().shape

((10, 11), (100, 3))

On recommence le même procédé pour l'autre matrice.

In [21]:
cols = ["c%d" % i for i in range(1, 3)]
mat2_array = mat2.select(mat2.index, array(*cols).alias("x"))
mat2_exploded = mat2_array.select("index", posexplode("x"))

Il ne reste plus qu'à faire le produit avec la méthode [join](https://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join) après avoir renommé les colonnes avant la jointure pour éviter les ambiguïtés.

In [22]:
mat2_exp2 = mat2_exploded.withColumnRenamed("index", "index2") \
                         .withColumnRenamed("pos", "pos2") \
                         .withColumnRenamed("col", "col2")
produit = mat1_exploded.join(mat2_exp2, mat1_exploded.pos == mat2_exp2.index2)

In [23]:
produit.printSchema()

root
 |-- index: long (nullable = true)
 |-- pos: integer (nullable = false)
 |-- col: double (nullable = true)
 |-- index2: long (nullable = true)
 |-- pos2: integer (nullable = false)
 |-- col2: double (nullable = true)



In [24]:
produit.toPandas().head()

Unnamed: 0,index,pos,col,index2,pos2,col2
0,0,0,0.964429,0,0,0.797985
1,0,0,0.964429,0,1,0.429407
2,1,0,0.897878,0,0,0.797985
3,1,0,0.897878,0,1,0.429407
4,2,0,0.997437,0,0,0.797985


In [25]:
prod = produit.select(produit.index.alias("i"), produit.pos2.alias("j"),
                         (produit.col * produit.col2).alias("val"))
final = prod.groupby("i", "j").sum("val")

In [26]:
final.printSchema()

root
 |-- i: long (nullable = true)
 |-- j: integer (nullable = false)
 |-- sum(val): double (nullable = true)



In [27]:
df = final.toPandas()

In [28]:
df.sort_values(["i", "j"]).head()

Unnamed: 0,i,j,sum(val)
7,0,0,3.16485
10,0,1,4.559446
18,1,0,2.52679
3,1,1,2.870236
6,2,0,2.545688


In [29]:
df.shape

(20, 3)