#### Common warnings:

1. __Backup your solution into the 'work' directory inside the home directory ('/home/jovyan'). It is the only one that state will be saved over sessions.__

1. Please, ensure that you call the right interpreter (python2 or python3). Do not write just "python" without the major version. There is no guarantee that any particular version of Python is set as the default one in the Grading system.

1. One cell must contain only one programming language.
E.g. if a cell contains Python code and you also want to call a bash-command (using “!”) in it, you should move the bash to another cell.

1. Our IPython converter is an improved version of the standard converter Nbconvert and it can process most of Jupyter's magic commands correctly (e.g. it understands "%%bash" and executes the cell as a "bash"-script). However, we highly recommend to avoid magics wherever possible.

#### Spark specific warnings:

1. It is a good practice to run Spark with master "yarn". However, containered system's performance is limited. If you see repeating Py4JavaErrors or Py4JNetworkErrors exceptions which you assume are not relevant to your code, feel free to change master to “local”.

1. You should eliminate extra symbols in output (such as quotes, brackets etc.). When you finally get the resulting dataframe it is easier to print wiki.take(1) instead of traverse RDD using for cycle. But in this case a lot of junk symbols will be printed like: `[['Anarchism', 'is', .. ]]`. See the right output example in the task.

#### Task hint
Each subsequent of these tasks is a continuation of the previous one. So, you may use the same IPython notebook for all the programming assignments in this week.

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import Window

spark_session = SparkSession.builder.enableHiveSupport().master("yarn").getOrCreate()

In [2]:
data = spark_session.read.parquet("/data/sample264")
meta = spark_session.read.parquet("/data/meta")

In [4]:
data.show(10)

+------+-------+--------+----------+
|userId|trackId|artistId| timestamp|
+------+-------+--------+----------+
| 13065| 944906|  978428|1501588527|
|101897| 799685|  989262|1501555608|
|215049| 871513|  988199|1501604269|
|309769| 857670|  987809|1501540265|
|397833| 903510|  994595|1501597615|
|501769| 818149|  994975|1501577955|
|601353| 958990|  973098|1501602467|
|710921| 916226|  972031|1501611582|
|  6743| 801006|  994339|1501584964|
|152407| 913509|  994334|1501571055|
+------+-------+--------+----------+
only showing top 10 rows



consider joining the graph to itself with the UserId remove pairs with the same tracks
For each track choose top 50 tracks ordered by weight similar to it and normalize weights of its edges (divide the weight of each edge on a sum of weights of all edges).
Use rank() to choose top 40 tracks as is done in the demo

In [12]:
#joining the garph with itself
# take diffrence of timestamp and apply filter for listing within 7 minutes
# Remove pairs with same tracks
filtered_df = (data.alias('temp_df1').join(data.alias('temp_df2'), 'userId')
               .withColumn('timestamp_diff', f.col('temp_df1.timestamp') - f.col('temp_df2.timestamp'))
               .where((f.col('timestamp_diff') > 0) 
                      & (f.col('timestamp_diff') <= 420) 
                      & (f.col('temp_df1.trackId') != f.col('temp_df2.trackId')))
              )

tracks = (filtered_df.select(f.col('temp_df1.trackId').alias('track1'), f.col('temp_df2.trackId').alias('track2'))
          .withColumn('id1', f.when((f.col('track1') < f.col('track2')), f.col('track1')).otherwise(f.col('track2')))
          .withColumn('id2', f.when((f.col('track1') < f.col('track2')), f.col('track2')).otherwise(f.col('track1')))
          .select('id1', 'id2')
          .groupBy(f.col('id1'), f.col('id2')).count()
         )

#### Normalization could be done by next function

In [17]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, sum,col

# normalize weights of its edges (divide the weight of each edge on a sum of weights of all edges).
def norm(df, key1, key2, field, n): 
    
    window = Window.partitionBy(key1).orderBy(col(field).desc())
        
    topsDF = df.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= n) \
        .drop(col("row_number")) 
        
    tmpDF = topsDF.groupBy(col(key1)).agg(col(key1), sum(col(field)).alias("sum_" + field))
   
    normalizedDF = topsDF.join(tmpDF, key1, "inner") \
        .withColumn("norm_" + field, col(field) / col("sum_" + field)) \
        .cache()

    return normalizedDF

In [18]:
normalized_tracks = norm(tracks, 'id1', 'id2', 'count', 40).select('id1', 'id2', 'norm_count')

In [25]:
normalized_tracks.take(50)

[Row(id1=798477, id2=883244, norm_count=1.0),
 Row(id1=798692, id2=898823, norm_count=1.0),
 Row(id1=800467, id2=855206, norm_count=1.0),
 Row(id1=801701, id2=920990, norm_count=1.0),
 Row(id1=802599, id2=908754, norm_count=0.03571428571428571),
 Row(id1=802599, id2=937714, norm_count=0.03571428571428571),
 Row(id1=802599, id2=811513, norm_count=0.03571428571428571),
 Row(id1=802599, id2=929402, norm_count=0.03571428571428571),
 Row(id1=802599, id2=924227, norm_count=0.03571428571428571),
 Row(id1=802599, id2=901687, norm_count=0.03571428571428571),
 Row(id1=802599, id2=860294, norm_count=0.03571428571428571),
 Row(id1=802599, id2=880642, norm_count=0.03571428571428571),
 Row(id1=802599, id2=920627, norm_count=0.03571428571428571),
 Row(id1=802599, id2=843219, norm_count=0.03571428571428571),
 Row(id1=802599, id2=892457, norm_count=0.03571428571428571),
 Row(id1=802599, id2=823001, norm_count=0.03571428571428571),
 Row(id1=802599, id2=899859, norm_count=0.03571428571428571),
 Row(id1=8

In [22]:
window = Window.orderBy(f.col('norm_count').desc())
    
TrackList = normalized_tracks.withColumn('position', f.rank().over(window)) \
    .filter(f.col('position') < 40) \
    .orderBy(f.col('id1'), f.col('id2')) \
    .select('id1', 'id2') \
    .take(40)

In [23]:
for val in TrackList:
    print("%s %s" % val)

798256 923706
798319 837992
798322 876562
798331 827364
798335 840741
798374 816874
798375 810685
798379 812055
798380 840113
798396 817687
798398 926302
798405 867217
798443 905923
798457 918918
798460 891840
798461 940379
798470 840814
798474 963162
798477 883244
798485 955521
798505 905671
798545 949238
798550 936295
798626 845438
798691 818279
798692 898823
798702 811440
798704 937570
798725 933147
798738 894170
798745 799665
798782 956938
798801 950802
798820 890393
798833 916319
798865 962662
798931 893574
798946 946408
799012 809997
799024 935246
