#### Common warnings:

1. __Backup your solution into the 'work' directory inside the home directory ('/home/jovyan'). It is the only one that state will be saved over sessions.__

1. Please, ensure that you call the right interpreter (python2 or python3). Do not write just "python" without the major version. There is no guarantee that any particular version of Python is set as the default one in the Grading system.

1. One cell must contain only one programming language.
E.g. if a cell contains Python code and you also want to call a bash-command (using “!”) in it, you should move the bash to another cell.

1. Our IPython converter is an improved version of the standard converter Nbconvert and it can process most of Jupyter's magic commands correctly (e.g. it understands "%%bash" and executes the cell as a "bash"-script). However, we highly recommend to avoid magics wherever possible.

#### Spark specific warnings:

1. It is a good practice to run Spark with master "yarn". However, containered system's performance is limited. If you see repeating Py4JavaErrors or Py4JNetworkErrors exceptions which you assume are not relevant to your code, feel free to change master to “local”.

1. You should eliminate extra symbols in output (such as quotes, brackets etc.). When you finally get the resulting dataframe it is easier to print wiki.take(1) instead of traverse RDD using for cycle. But in this case a lot of junk symbols will be printed like: `[['Anarchism', 'is', .. ]]`. See the right output example in the task.

#### Task hint
Each subsequent of these tasks is a continuation of the previous one. So, you may use the same IPython notebook for all the programming assignments in this week.

In [8]:
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.enableHiveSupport().master("local").getOrCreate()

In [9]:
data = spark_session.read.parquet("/data/sample264")
meta = spark_session.read.parquet("/data/meta")

#### Normalization could be done by next function

In [10]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, sum

def norm(df, key1, key2, field, n): 
    
    window = Window.partitionBy(key1).orderBy(col(field).desc())
        
    topsDF = df.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= n) \
        .drop(col("row_number")) 
        
    tmpDF = topsDF.groupBy(col(key1)).agg(col(key1), sum(col(field)).alias("sum_" + field))
   
    normalizedDF = topsDF.join(tmpDF, key1, "inner") \
        .withColumn("norm_" + field, col(field) / col("sum_" + field)) \
        .cache()

    return normalizedDF

In [11]:
from pyspark.sql import Window
from pyspark.sql.functions import col, rank, abs

data1 = data.select(col('userId'), col('trackId').alias('trackId1'), col('timestamp').alias('timestamp1'))
data2 = data.select(col('userId'), col('trackId').alias('trackId2'), col('timestamp').alias('timestamp2'))

tracks = data1.join(data2, 'UserId') \
              .filter(col('trackId1') != col('trackId2')) \
              .filter(abs(col('timestamp1') - col('timestamp2')) <= 420) \
              .groupby(col('trackId1'), col('trackId2')) \
              .count().alias('count')

normTracks = norm(tracks, "trackId1", "trackId2", "count", 40)

In [12]:
# results = (normTracks
#  .orderBy(col("norm_count").desc(), col("trackId1"), col("trackId2"))
#  .limit(40)
# )
# results = results.select(col('trackId1'), col('trackId2')).collect()

In [14]:
users = data.select(col('userId'), col('trackId'))
tracksCounts = users.groupBy(col('userId'), col('trackId')).count()
normalizedTracks = norm(tracksCounts, 'userId','trackId','count',  1000)

+------+-------+-----+---------+------------------+
|userId|trackId|count|sum_count|        norm_count|
+------+-------+-----+---------+------------------+
|  3175| 947718|    1|        9|0.1111111111111111|
|  3175| 940951|    1|        9|0.1111111111111111|
|  3175| 845631|    1|        9|0.1111111111111111|
|  3175| 864690|    1|        9|0.1111111111111111|
|  3175| 831005|    1|        9|0.1111111111111111|
|  3175| 930432|    1|        9|0.1111111111111111|
|  3175| 965012|    1|        9|0.1111111111111111|
|  3175| 858940|    1|        9|0.1111111111111111|
|  3175| 829307|    1|        9|0.1111111111111111|
|  5518| 961148|    3|        6|               0.5|
+------+-------+-----+---------+------------------+



In [16]:
# result = normalizedTracks.orderBy(col('norm_count').desc(), col('userId'), col('trackId')).limit(40)
# result = result.select(col('userId'), col('trackId')).collect()

In [21]:
# artists = data.select(col('userId').alias('id1'), col('artistId').alias('id2'))
# artistsCounts = artists.groupBy(col('id1'), col('id2')).count()
# normArtists = norm(artistsCounts, 'id1', 'id2', 'count', 100)
# result = normArtists.orderBy(col('norm_count').desc(), col('id1'), col('id2')).limit(40)
# result = result.select(col('id1'), 'id2').collect()

In [23]:
tracksAndArtists = data.select(col('artistId').alias('id1'), col('trackId').alias('id2'))
grouped = tracksAndArtists.groupBy('id1', 'id2').count()
normalized = norm(grouped, 'id1', 'id2', 'count', 100)
result = normalized.orderBy(col('norm_count').desc(), col('id1'), col('id2')).limit(40)
result = result.select(col('id1'), 'id2').collect()

In [5]:
spark_session.stop()

#### Final notice:

1. Please take into account that you must __not__ redirect __stderr__ to anywhere. Hadoop, Hive, and Spark print their logs to stderr and the Grading system also reads and analyses it.

1. During checking the code from the notebook, the system runs all notebook's cells and reads the output of only the last filled cell. It is clear that any exception should not be thrown in the running cells. If you decide to write some text in a cell, you should change the style of the cell to Markdown (Cell -> Cell type -> Markdown).

1. The Grader takes into account the output from the sample dataset you have in the notebook. Therefore, you have to "Run All" cells in the notebook before you send the ipynb solution.

1. The name of the notebook must contain only Roman letters, numbers and characters “-” or “_”. For example, Windows adds something like " (2)" (with the leading space) at the end of a filename if you try to download a file with the same name. This is a problem, because you will have a space character and curly braces "(" and ")". 

In [24]:
for val in result:
    print("%s %s" % val)

967993 869415
967998 947428
968004 927380
968017 859321
968022 852786
968034 807671
968038 964150
968042 835935
968043 913568
968046 935077
968047 806127
968065 907906
968073 964586
968086 813446
968092 837129
968118 914441
968125 821410
968140 953008
968148 877445
968161 809793
968163 803065
968168 876119
968189 858639
968221 896937
968224 892880
968232 825536
968237 932845
968238 939177
968241 879045
968242 911250
968248 953554
968255 808494
968259 880230
968265 950148
968266 824437
968269 913243
968272 816049
968278 946743
968285 847460
968286 940006
