In [179]:
MASTER = "local"
NUM_PROCESSORS = "4"
NUM_EXECUTORS = "1"
NUM_PARTITIONS = 40

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
import pyspark.sql.functions as psf
import json



In [2]:
conf = SparkConf()

conf.set("spark.app.name", "one_part_data")
conf.set("spark.master", MASTER)
conf.set("spark.executor.cores", NUM_PROCESSORS)
conf.set("spark.executor.instances", NUM_EXECUTORS)
conf.set("spark.executor.memory", "15g")
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.max", "2000")
conf.set("spark.executor.heartbeatInterval", "6000s")
conf.set("spark.network.timeout", "10000000s")
conf.set("spark.shuffle.spill", "true")
conf.set("spark.driver.memory", "15g")
conf.set("spark.driver.maxResultSize", "15g")

<pyspark.conf.SparkConf at 0x7f4aa8861a20>

In [3]:
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
      .config(conf=conf) \
      .master("local[*]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()
spark

22/12/14 11:25:24 WARN Utils: Your hostname, yagor-pc resolves to a loopback address: 127.0.1.1; using 192.168.0.107 instead (on interface enp7s0)
22/12/14 11:25:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/14 11:25:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
#spark.stop

In [21]:
schema = StructType([\
        StructField("user", IntegerType()),\
        StructField("film", IntegerType()), \
    ])

In [7]:
folderPath = "/home/yagor/Рабочий стол/mipt/spark/train_0_part_data.csv"
df = spark.read.csv(folderPath, schema=schema)
df.show()

+----+-----+
|user| film|
+----+-----+
|   0|16981|
|   0|23846|
|   0|27947|
|   0|31189|
|   0|31713|
|   0|32947|
|   0|33598|
|   0|36763|
|   0|38012|
|   0|39632|
|   0|39826|
|   0|40069|
|   0|40177|
|   0|42118|
|   0|43282|
|   0|44186|
|   0|44439|
|   0|49093|
|   0|49555|
|   0|49739|
+----+-----+
only showing top 20 rows



In [8]:
df.schema

StructType([StructField('user', IntegerType(), True), StructField('film', IntegerType(), True)])

In [9]:
df.count()

                                                                                

116468778

In [10]:
values = df.groupBy('film').count()
values.show()



+------+-----+
|  film|count|
+------+-----+
|373523|  776|
|417183|  585|
|108460| 2013|
|197890| 4263|
|203429|10146|
|220674|11830|
|200625|12263|
|591532|13334|
|628058| 1709|
|259276| 6027|
| 42468|  210|
| 77422|   45|
|180155| 7056|
|188834|  838|
|173914|13728|
|225262| 2463|
|396948|19929|
|170862|12454|
|193228| 2586|
|716230| 1137|
+------+-----+
only showing top 20 rows



                                                                                

### Отбор 500 самых часто просматриваемых/популярных фильмов

In [16]:
values_sorted = values.sort('count')


In [17]:
values_sorted.show()



+------+-----+
|  film|count|
+------+-----+
|691830|    1|
|750322|    1|
|214115|    1|
|743340|    1|
|164126|    1|
|820116|    1|
|704723|    1|
|390441|    1|
|209485|    1|
| 90557|    1|
|197651|    1|
|417963|    1|
|851631|    1|
|314123|    1|
| 40330|    1|
|425197|    1|
|815069|    1|
|169572|    1|
|540722|    1|
|224450|    1|
+------+-----+
only showing top 20 rows



                                                                                

In [18]:
values_sorted_part_list = values_sorted.tail(500)

                                                                                

In [19]:
values_sorted_part = spark.createDataFrame(data = values_sorted_part_list, schema = ["film", "count"])

In [20]:
values_sorted_part.show()

+------+-----+
|  film|count|
+------+-----+
|226442|16753|
|321489|16760|
|323232|16789|
|395107|16795|
|200249|16797|
|332231|16833|
|206272|16836|
|376603|16837|
|207079|16839|
|218918|16857|
|163617|16863|
|228717|16878|
|227173|16879|
|264081|16880|
|383534|16907|
|205458|16919|
|237383|16928|
|204533|16933|
|218533|16994|
|193148|17003|
+------+-----+
only showing top 20 rows



In [21]:
values_sorted_part.count()

500

In [22]:
from pyspark.sql import functions as F

df2 = df.groupBy("film").agg(F.collect_list("user"))


In [23]:
df2.count()

                                                                                

547240

In [24]:
df2.show()



+----+--------------------+
|film|  collect_list(user)|
+----+--------------------+
|  28|[9151, 19312, 246...|
|  31|             [80062]|
|  34|            [105444]|
|  53|             [11458]|
|  65|[94699, 98105, 11...|
|  85|             [75891]|
| 101|[838, 10786, 1027...|
| 108|[35222, 77924, 13...|
| 115|      [30232, 38963]|
| 137|      [81750, 95773]|
| 183|[1495, 22656, 206...|
| 193|[3924, 31201, 553...|
| 210|[43302, 48541, 80...|
| 251|            [125673]|
| 255|[1813, 11939, 349...|
| 296|[3031, 29807, 390...|
| 321|[10018, 989, 1017...|
| 322|[11077, 504, 1188...|
| 362|             [38090]|
| 368|[10318, 6706, 695...|
+----+--------------------+
only showing top 20 rows



                                                                                

In [None]:
df2.head(1)

                                                                                

[Row(film=12, collect_list(user)=[81397])]

In [38]:
df2.tail(1)

                                                                                

[Row(film=855760, collect_list(user)=[3096, 7591, 11382, 17435, 46587, 54408, 55663, 60672, 67620, 68567, 74894, 93917, 100526, 101049, 106280, 109668, 110864, 111457, 116906, 125847, 130535])]

In [29]:
df3 = df2.join(values_sorted_part,'film','inner')

In [30]:
df3.count()

                                                                                

500

In [31]:
df3.show()



+------+--------------------+-----+
|  film|  collect_list(user)|count|
+------+--------------------+-----+
|120663|[9962, 9, 9964, 1...|28747|
|121002|[9962, 15, 9965, ...|17369|
|121207|[9960, 4, 9961, 9...|30448|
|125506|[7, 9961, 9, 9965...|20195|
|192238|[9961, 5, 9962, 6...|24406|
|198430|[9954, 16, 9962, ...|19761|
|199203|[9964, 1, 9972, 5...|20265|
|200249|[9963, 1, 9972, 3...|16797|
|205458|[9962, 33, 9964, ...|16919|
|214871|[9958, 6, 9963, 1...|23005|
|216953|[9958, 18, 9963, ...|19618|
|217062|[9958, 11, 9968, ...|18546|
|220905|[9958, 17, 9963, ...|21910|
|221341|[9958, 11, 9959, ...|25089|
|221766|[9955, 6, 9958, 8...|24090|
|223238|[9963, 1, 9970, 2...|23670|
|224090|[9955, 6, 9958, 8...|40848|
|224413|[9958, 1, 9959, 2...|33392|
|224507|[9954, 12, 9958, ...|18295|
|229578|[9958, 8, 9959, 1...|17245|
+------+--------------------+-----+
only showing top 20 rows



                                                                                

### TF-IDF (сходство фильмов на основе просмотров пользователей)

In [32]:
hashingTF = HashingTF(inputCol="collect_list(user)", outputCol="rawFeatures", numFeatures=10000, )
featurizedData = hashingTF.transform(df3)
featurizedData.cache()

DataFrame[film: int, collect_list(user): array<int>, count: bigint, rawFeatures: vector]

In [34]:
rescaledData.count()

500

### Compute L2 norm

In [35]:
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol="features", outputCol="norm")
data = normalizer.transform(rescaledData)

### Compute matrix product:

In [36]:
dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType())
res = data.alias("i").join(data.alias("j"), psf.col("i.film") < psf.col("j.film"))\
    .select(
        psf.col("i.film").alias("i"), 
        psf.col("j.film").alias("j"), 
        dot_udf("i.norm", "j.norm").alias("dot"))\
    .sort("i", "j")

In [37]:
res.show()



+------+------+------------------+
|     i|     j|               dot|
+------+------+------------------+
|106985|107800|0.7056756236927901|
|106985|108171|0.6663264787432368|
|106985|108239|0.7047961684423472|
|106985|108773|0.7101651181311384|
|106985|110836|0.7134403697891201|
|106985|111919|0.7219618446806243|
|106985|112226|0.7034818715750314|
|106985|112895|0.7247384373492936|
|106985|113480|0.7052136145358082|
|106985|114169|0.7430677575223128|
|106985|114231|0.7058251544013512|
|106985|114280|0.6651666663415046|
|106985|115668|0.6823146499468209|
|106985|116267|0.7313373313406346|
|106985|118205|0.7000104308700368|
|106985|119019|0.6982923189285579|
|106985|120621|0.7024982289960983|
|106985|120663|0.7404632371829023|
|106985|120975|0.6775225513998767|
|106985|121002|0.6783826197801929|
+------+------+------------------+
only showing top 20 rows



                                                                                

In [None]:
res.sort()

In [39]:
res.count()

                                                                                

124750

In [40]:
res_sort = res.sort(['i', 'dot'])

In [41]:
res_sort.show()



+------+------+------------------+
|     i|     j|               dot|
+------+------+------------------+
|106985|218329|0.5824472100541838|
|106985|228717|0.5840643942334456|
|106985|217972|0.5846528703121342|
|106985|218106|0.5892363744997995|
|106985|218533|0.5897074470367715|
|106985|237383|0.5899152191999293|
|106985|229578|0.5917382949394276|
|106985|214193|0.5919590249032748|
|106985|224166|0.5922217811794538|
|106985|231449|0.5926807081788006|
|106985|235865|0.5956655980776531|
|106985|225905|0.5974260528255337|
|106985|239845| 0.598413683439196|
|106985|227823|0.5995885435535463|
|106985|224507|0.5999072907858585|
|106985|227328|0.6001307660698709|
|106985|240442|0.6021419575633536|
|106985|233474|0.6027525033795953|
|106985|218918|0.6029044480699042|
|106985|235326|0.6040400690236306|
+------+------+------------------+
only showing top 20 rows



                                                                                

In [48]:

res_with_tresh = res.filter((res.dot <= 0.586))

[Stage 120:==>     (11140 + 12) / 40000][Stage 121:>              (0 + 0) / 898]

[Stage 120:==>     (11200 + 13) / 40000][Stage 121:>              (0 + 0) / 898]

In [None]:
res_with_tresh.show()



In [51]:
res_with_tresh.write \
  .option("header", "true") \
  .csv("/home/yagor/Рабочий стол/mipt/spark/500res_new_tresh_58.csv")

                                                                                

### Other (not uses)

In [18]:
res_with_tresh.repartition(1).write.format('com.databricks.spark.csv').save("/home/yagor/Рабочий стол/mipt/spark/500res.csv", header = 'true')

                                                                                

In [None]:
df2 = res_with_tresh.groupBy("film").agg(F.collect_list("user"))

In [23]:
res_with_tresh_ = res_with_tresh.groupBy("i").agg(F.collect_list("j"))

In [None]:
res_with_tresh_

In [None]:
res_with_tresh.write.csv(f'res10000.csv')

In [None]:
data.filter('dot > 0').show()


In [17]:
dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType())
data.alias("i").join(data.alias("j"), psf.col("i.film") < psf.col("j.film"))\
    .select(
        psf.col("i.film").alias("i"), 
        psf.col("j.film").alias("j"), 
        dot_udf("i.norm", "j.norm").alias("dot"))\
    .sort("i", "j")\
    .show()

[Stage 18:>                                                         (0 + 1) / 1]

+---+----+---+
|  i|   j|dot|
+---+----+---+
|471| 833|0.0|
|471|1088|0.0|
|471|1238|0.0|
|471|1342|0.0|
|471|1580|0.0|
|471|1591|0.0|
|471|1829|0.0|
|471|2122|0.0|
|471|2866|0.0|
|471|3749|0.0|
|471|3794|0.0|
|471|3918|0.0|
|471|3997|0.0|
|471|4818|0.0|
|471|5156|0.0|
|471|5518|0.0|
|471|5803|0.0|
|471|6466|0.0|
|471|7253|0.0|
|471|7754|0.0|
+---+----+---+
only showing top 20 rows



                                                                                

In [26]:
data.show()

+----+--------------------+--------------------+--------------------+--------------------+
|film|  collect_list(user)|         rawFeatures|            features|                norm|
+----+--------------------+--------------------+--------------------+--------------------+
| 471|            [137995]|(10000,[1759],[1.0])|(10000,[1759],[3....|(10000,[1759],[1.0])|
| 833|[41326, 50660, 86...|(10000,[561,604,6...|(10000,[561,604,6...|(10000,[561,604,6...|
|1088|[14709, 15331, 16...|(10000,[15,63,239...|(10000,[15,63,239...|(10000,[15,63,239...|
|1238|[56843, 76606, 10...|(10000,[525,2412,...|(10000,[525,2412,...|(10000,[525,2412,...|
|1342|[15751, 35509, 37...|(10000,[437,1768,...|(10000,[437,1768,...|(10000,[437,1768,...|
|1580|[17427, 64685, 10...|(10000,[2502,6319...|(10000,[2502,6319...|(10000,[2502,6319...|
|1591|        [6321, 8162]|(10000,[3555,4757...|(10000,[3555,4757...|(10000,[3555,4757...|
|1829|[13375, 13780, 14...|(10000,[0,13,25,3...|(10000,[0,13,25,3...|(10000,[0,13,25,3...|

In [None]:
df_filtered=data.filter('age > 0')
df_filtered.show()

In [11]:
hashingTF = HashingTF(inputCol="collect_list(user)", outputCol="rawFeatures", numFeatures=10000, )
featurizedData = hashingTF.transform(df2)
featurizedData.cache()

DataFrame[film: int, collect_list(user): array<int>, rawFeatures: vector]

In [13]:
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=1) #IDF(inputCol="rawFeatures", outputCol="features") used
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("film", "features").show()



+----+--------------------+
|film|            features|
+----+--------------------+
| 471|(10000,[1759],[3....|
| 833|(10000,[561,604,6...|
|1088|(10000,[15,63,239...|
|1238|(10000,[525,2412,...|
|1342|(10000,[437,1768,...|
|1580|(10000,[2502,6319...|
|1591|(10000,[3555,4757...|
|1829|(10000,[0,13,25,3...|
|2122|(10000,[947],[4.2...|
|2866|(10000,[3091,3254...|
|3749|(10000,[8392],[3....|
|3794|(10000,[1124,4715...|
|3918|(10000,[0,14,23,2...|
|3997|(10000,[253,732,1...|
|4818|(10000,[227,239,2...|
|5156|(10000,[1297,3881...|
|5518|(10000,[4125,4674...|
|5803|(10000,[5299,9318...|
|6466|(10000,[134,413,1...|
|7253|(10000,[681,2036,...|
+----+--------------------+
only showing top 20 rows



                                                                                

#### Compute L2 norm

In [14]:
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol="features", outputCol="norm")
data = normalizer.transform(rescaledData)

#### Compute matrix product:

In [19]:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
mat = IndexedRowMatrix(
    data.select("film", "norm")\
        .rdd.map(lambda row: IndexedRow(row.film, row.norm.toArray()))).toBlockMatrix()
dot = mat.multiply(mat.transpose())

                                                                                

## Test rec

In [13]:
folderPath = "/home/yagor/Рабочий стол/mipt/spark/500res_new_tresh_58.csv/part-00000-f6a9852e-a992-4232-a168-c14378ad4df5-c000.csv"
df = spark.read.option("header", "true").csv(folderPath)
df.show()

+------+------+------------------+
|     i|     j|               dot|
+------+------+------------------+
|106985|217972|0.5846528703121342|
|106985|218329|0.5824472100541838|
|106985|228717|0.5840643942334456|
|107800|237383| 0.579043681145354|
|108171|190629|0.5855307770089813|
|108171|193148|0.5743025408361803|
|108171|197540|0.5848344617556166|
|108171|200249|0.5855999706985752|
|108171|200630|0.5750308535617188|
|108171|204533|0.5834368737767368|
|108171|205308|0.5787746446296952|
|108171|205458|0.5758225088029106|
|108171|206272|0.5821422772432235|
|108171|214193|0.5562703952349195|
|108171|215302|0.5813288020660338|
|108171|216227|0.5855141405790075|
|108171|216953|0.5768856233611499|
|108171|217062|0.5770683441406884|
|108171|217972|0.5534269082021103|
|108171|218101|0.5805848728435947|
+------+------+------------------+
only showing top 20 rows



In [14]:
df.count()

10210

In [15]:
good_pair = df.filter((df.dot <= 0.57))

In [11]:
good_pair.count()

4292

In [16]:
values = df.groupBy('i').count()
values.show()

+------+-----+
|     i|count|
+------+-----+
|234035|   29|
|332231|   14|
|227173|   87|
|238664|    3|
|207831|   12|
|208479|   11|
|221883|   33|
|227823|   46|
|234729|   25|
|382841|    1|
|220905|   35|
|232437|   35|
|110836|    1|
|115668|    2|
|233014|   22|
|215340|    8|
|125506|   17|
|188668|   13|
|235865|   70|
|239845|   79|
+------+-----+
only showing top 20 rows



In [20]:
rec_norm_number = values.filter(('count >= 10'))
rec_norm_number.count()

166

In [22]:
rec_norm_number.show()

+------+-----+
|     i|count|
+------+-----+
|234035|   29|
|332231|   14|
|227173|   87|
|207831|   12|
|208479|   11|
|221883|   33|
|227823|   46|
|234729|   25|
|220905|   35|
|232437|   35|
|233014|   22|
|125506|   17|
|188668|   13|
|235865|   70|
|239845|   79|
|200249|  164|
|218106|   92|
|223625|   42|
|240548|   38|
|214193|  102|
+------+-----+
only showing top 20 rows



In [24]:
values_sorted = rec_norm_number.sort('count')


In [25]:
values_sorted.show()

+------+-----+
|     i|count|
+------+-----+
|213654|   10|
|238073|   10|
|221404|   10|
|206578|   11|
|192238|   11|
|208479|   11|
|239621|   11|
|207831|   12|
|232522|   13|
|188668|   13|
|204093|   13|
|197965|   13|
|120975|   13|
|120621|   14|
|332231|   14|
|187748|   14|
|254046|   14|
|212140|   15|
|256526|   15|
|210745|   15|
+------+-----+
only showing top 20 rows



In [26]:
values_sorted_part_list = values_sorted.tail(100)

In [27]:
values_sorted_part = spark.createDataFrame(data = values_sorted_part_list, schema = ["i", "count"])

In [28]:
values_sorted_part.show()

+------+-----+
|     i|count|
+------+-----+
|108171|   42|
|221918|   43|
|215179|   45|
|239341|   45|
|227253|   45|
|227823|   46|
|231689|   46|
|221825|   47|
|239111|   47|
|219998|   50|
|233791|   50|
|188313|   51|
|228594|   51|
|224131|   52|
|199566|   52|
|225066|   54|
|221081|   54|
|197479|   55|
|215302|   55|
|199598|   56|
+------+-----+
only showing top 20 rows



In [29]:
values_sorted_part.count()

100

In [30]:
from pyspark.sql import functions as F

df2 = df.groupBy("i").agg(F.collect_list("j"))


In [31]:
df2.count()

252

In [32]:
df2.show()

+------+--------------------+
|     i|     collect_list(j)|
+------+--------------------+
|106985|[217972, 218329, ...|
|107800|            [237383]|
|108171|[190629, 193148, ...|
|108239|[217972, 218329, ...|
|108773|[218329, 218533, ...|
|110836|            [237383]|
|112226|[217972, 218106, ...|
|112895|[214193, 217972, ...|
|113480|            [237383]|
|114231|[217972, 218106, ...|
|114280|[200249, 204533, ...|
|115668|    [218533, 237383]|
|118205|[217972, 218329, ...|
|119019|[214193, 217972, ...|
|120621|[214193, 217972, ...|
|120975|[204533, 205308, ...|
|121002|[163617, 190629, ...|
|121351|[190629, 191918, ...|
|123647|[200249, 202954, ...|
|123920|[204533, 205308, ...|
+------+--------------------+
only showing top 20 rows



In [33]:
df2.head(1)

[Row(i='106985', collect_list(j)=['217972', '218329', '228717'])]

In [34]:
df2.tail(1)

[Row(i='400784', collect_list(j)=['601086'])]

In [35]:
df3 = df2.join(values_sorted_part,'i','inner')

In [36]:
df3.count()

100

In [37]:
df3.show()

+------+--------------------+-----+
|     i|     collect_list(j)|count|
+------+--------------------+-----+
|108171|[190629, 193148, ...|   42|
|221918|[242813, 244971, ...|   43|
|215179|[254046, 256526, ...|   45|
|239341|[242813, 244971, ...|   45|
|227253|[228717, 230166, ...|   45|
|227823|[252155, 254046, ...|   46|
|231689|[238073, 250344, ...|   46|
|221825|[224131, 225209, ...|   47|
|239111|[242813, 252155, ...|   47|
|219998|[221825, 222409, ...|   50|
|233791|[238073, 242813, ...|   50|
|188313|[214193, 215179, ...|   51|
|228594|[232234, 234409, ...|   51|
|224131|[224166, 224507, ...|   52|
|199566|[214193, 215179, ...|   52|
|225066|[242813, 244971, ...|   54|
|221081|[224507, 227173, ...|   54|
|197479|[214193, 215179, ...|   55|
|215302|[242813, 244971, ...|   55|
|199598|[214193, 215179, ...|   56|
+------+--------------------+-----+
only showing top 20 rows



### Save and predict

In [39]:
result = df3.select(['i', 'collect_list(j)'])

In [103]:
result.show()

+------+--------------------+
|     i|     collect_list(j)|
+------+--------------------+
|108171|[190629, 193148, ...|
|221918|[242813, 244971, ...|
|215179|[254046, 256526, ...|
|239341|[242813, 244971, ...|
|227253|[228717, 230166, ...|
|227823|[252155, 254046, ...|
|231689|[238073, 250344, ...|
|221825|[224131, 225209, ...|
|239111|[242813, 252155, ...|
|219998|[221825, 222409, ...|
|233791|[238073, 242813, ...|
|188313|[214193, 215179, ...|
|228594|[232234, 234409, ...|
|224131|[224166, 224507, ...|
|199566|[214193, 215179, ...|
|225066|[242813, 244971, ...|
|221081|[224507, 227173, ...|
|197479|[214193, 215179, ...|
|215302|[242813, 244971, ...|
|199598|[214193, 215179, ...|
+------+--------------------+
only showing top 20 rows



In [164]:
from pyspark.sql import Row
R = Row('film', 'rec films')

# use enumerate to add the ID column
result_10 = spark.createDataFrame([R(i, x[:10]) for i, x in result.collect()])

In [174]:
result_10.show()

+------+--------------------+
|  film|           rec films|
+------+--------------------+
|108171|[190629, 193148, ...|
|221918|[242813, 244971, ...|
|215179|[254046, 256526, ...|
|239341|[242813, 244971, ...|
|227253|[228717, 230166, ...|
|227823|[252155, 254046, ...|
|231689|[238073, 250344, ...|
|221825|[224131, 225209, ...|
|239111|[242813, 252155, ...|
|219998|[221825, 222409, ...|
|233791|[238073, 242813, ...|
|188313|[214193, 215179, ...|
|228594|[232234, 234409, ...|
|224131|[224166, 224507, ...|
|199566|[214193, 215179, ...|
|225066|[242813, 244971, ...|
|221081|[224507, 227173, ...|
|197479|[214193, 215179, ...|
|215302|[242813, 244971, ...|
|199598|[214193, 215179, ...|
+------+--------------------+
only showing top 20 rows



In [171]:
first = result_10.take(6)

In [172]:
first

[Row(film='108171', rec films=['190629', '193148', '197540', '200249', '200630', '204533', '205308', '205458', '206272', '214193']),
 Row(film='221918', rec films=['242813', '244971', '252155', '254046', '256526', '261023', '264081', '266755', '321489', '328238']),
 Row(film='215179', rec films=['254046', '256526', '257591', '260904', '261023', '321489', '323232', '328238', '332231', '335462']),
 Row(film='239341', rec films=['242813', '244971', '250344', '254046', '256526', '260904', '261023', '264081', '266755', '321489']),
 Row(film='227253', rec films=['228717', '230166', '231449', '250344', '252155', '254046', '256526', '257591', '260904', '261023']),
 Row(film='227823', rec films=['252155', '254046', '256040', '256526', '260904', '261023', '264081', '265875', '321489', '323232'])]

In [176]:
res = {}
for i in result_10.collect():
    key = i[0]
    values =list(i[1])
    res[key] = values

In [178]:
res['108171']

['190629',
 '193148',
 '197540',
 '200249',
 '200630',
 '204533',
 '205308',
 '205458',
 '206272',
 '214193']

In [180]:

with open("/home/yagor/Рабочий стол/mipt/spark/recomend_films.json", "w") as fp:
    json.dump(res , fp)

In [186]:
with open('/home/yagor/Рабочий стол/mipt/spark/recomend_films.json') as json_file:
    data = json.load(json_file)

In [187]:
data['108171']

['190629',
 '193148',
 '197540',
 '200249',
 '200630',
 '204533',
 '205308',
 '205458',
 '206272',
 '214193']

In [188]:
len(data)

100