# Notebook API Python Spark DataFrame - Graphs with DataFrames- ESILV 2025


## Preparation
*   ***Check that computing resources*** are allocated to your notebook if it is
connected (see disk RAM indicated at top right). If not, click on the connect button to obtain resources.

*   ***Create the directory*** to store the necessary files on your google
drive (give the notebook permission to access your drive when requested). *Adjust the name of your folder* : **MyDrive/ens/esilv/data/**

In [None]:
import os
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

drive_dir = "/content/drive/MyDrive/ens/esilv/data/"
os.makedirs(drive_dir, exist_ok=True)
os.listdir(drive_dir)

Mounted at /content/drive


['data.csv.gz',
 'meta.csv.gz',
 'airports.csv',
 'airlines.csv',
 '188591317_T_ONTIME.csv.gz']

***Install pyspark and findspark:***

In [None]:
!pip install -q pyspark
!pip install -q findspark

***Install GraphFrames :***

In [None]:
!pip install -q graphframes

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/154.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m153.6/154.7 kB[0m [31m8.7 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.7/154.7 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
!curl -L -o "/usr/local/lib/python3.10/dist-packages/pyspark/jars/graphframes-0.8.3-spark3.5-s_2.12.jar" http://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.8.3-spark3.5-s_2.12/graphframes-0.8.3-spark3.5-s_2.12.jar

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   164  100   164    0     0   1123      0 --:--:-- --:--:-- --:--:--  1131
100   146  100   146    0     0    244      0 --:--:-- --:--:-- --:--:--   705
curl: (23) Failure writing output to destination


***Start the spark session:***

In [None]:
import os
# !find /usr/local -name "pyspark"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.12/dist-packages/pyspark"
os.environ["JAVA_HOME"] = "/usr"

In [None]:
# Main imports
import findspark
from pyspark.sql import SparkSession
from pyspark import SparkConf

# for dataframe and udf
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *

# initialise environment variables for spark
findspark.init()

# Start spark session
# --------------------------
def start_spark():
  local = "local[*]"
  appName = "TP"

  gf = "graphframes:graphframes:0.8.3-spark3.5-s_2.12"

  configLocale = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","6G").\
  set("spark.sql.catalogImplementation","in-memory").\
  set("spark.jars.packages", gf)

  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")

  spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")

  # Adjust the query execution environment to the size of the cluster (4 cores)
  spark.conf.set("spark.sql.shuffle.partitions","4")
  print("session started, its id is ", sc.applicationId)
  return spark
spark = start_spark()

session started, its id is  local-1765487329930


## Synthetic data on music

##### Data description:

- File **data.csv**: contains information on songs (trackId) performed by artists (artistId) and listened to by users (userId) on a date given by a timestamp. Contains 260664 lines.
- File **meta.csv**: contains the names (field 'Name') of the songs if type==track or of the artists if type==artist.
  Artist' is the name of the artist performing the song if type==track or the name of the artist (same value as 'Name') if track==artist. Id' is the identifier of a song or artist. Contains 44319 lines.

In [None]:
# URL of the folder containing useful data files (data.csv and meta.csv)
# ---------------------------------------------------------------------------
# if you have problems downloading datasets, go directly to the URL below
PUBLIC_DATASET_URL = "https://nuage.lip6.fr/s/LqD9N23kxrfHopr"
PUBLIC_DATASET=PUBLIC_DATASET_URL + "/download?path="

print("URL for data: ", PUBLIC_DATASET_URL)

URL for data:  https://nuage.lip6.fr/s/LqD9N23kxrfHopr


## Read data




In [None]:
import os
from urllib import request

import os
from urllib import request

def load_file(file,dir):
  if(os.path.isfile(drive_dir+file)):
    print(file, "is already stored")
  else:
    url = PUBLIC_DATASET + "/"+ dir + "/" + file
    print("downloading from URL: ", url, "save in : " + drive_dir   + file)
    request.urlretrieve(url , drive_dir + file)

load_file("data.csv.gz", "musique")
load_file("meta.csv.gz", "musique")

# Liste des fichiers téléchargés
print("Files downloaded:")
os.listdir(drive_dir)

data.csv.gz is already stored
meta.csv.gz is already stored
Files downloaded:


['data.csv.gz',
 'meta.csv.gz',
 'airports.csv',
 'airlines.csv',
 '188591317_T_ONTIME.csv.gz']

In [None]:
#The folder containing the imported csv files:
DATASET_DIR="/content/drive/MyDrive/ens/esilv/data"

In [None]:
#==============
# Data
#==============
schema = """
          userId LONG,
          trackId LONG,
          artistId LONG,
          timestamp LONG
        """
print("Reading file: ", DATASET_DIR+"/data.csv.gz")
data = spark.read.format("csv").option("header", "true").schema(schema) \
            .load(DATASET_DIR+"/data.csv.gz").persist()
#data.show(5)
#data.count() #260664


Reading file:  /content/drive/MyDrive/ens/esilv/data/data.csv.gz


In [None]:
#==============
# Data description
#==============
schema = """
          type STRING,
          Name STRING,
          Artist STRING,
          Id LONG
        """
print("reading file: ", DATASET_DIR+"/meta.csv.gz")
meta = spark.read.format("csv").option("header", "true").schema(schema) \
            .load(DATASET_DIR+"/meta.csv.gz").persist()

meta.show(5)
meta.count() #44319

## Build the graph


In [None]:
#function used to calculate arc weights
#df: dataframe, source: name of column containing arc source nodes
#weight: initial weight before normalization, n: maximum number of arcs to keep for each source
from pyspark.sql.functions import row_number, sum
from pyspark.sql import Window

def compute_weight(df, source, weight, n):

    window = Window.partitionBy(source).orderBy(col(weight).desc())

    filterDF = df.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= n) \
        .drop(col("row_number"))

    tmpDF = filterDF.groupBy(col(source)).agg(sum(col(weight)).alias("sum_" + weight))

    finalDF = filterDF.join(tmpDF, source, "inner") \
        .withColumn("norm_" + weight, col(weight) / col("sum_" + weight)) \
        .cache()
    return finalDF

 ### Building weighted links between users and songs
* Build a userTrack DataFrame from data to store arcs between users and songs. For each user (userId) we add an arc to a song (trackId) with a weight equal to the total number of times the user has listened to the song. Use the compute_weight function to save for each user the 100 songs with the highest weight and normalize the weights of the saved arcs.   
    
* Display the result: keep only the arcs with the 5 highest possible weight values (use the dense_rank() function and the over(window) window). Display 20 rows of the result, sorting in descending order of weights, then in ascending order of userId and trackId.

In [None]:
from pyspark.sql.functions import col, rank

#number of times each user listens to a track
userTrack = ...

#calculate final weight using compute_weight
userTrack = ...

window = Window.partitionBy("userId").orderBy(col("norm_count").desc())

userTrackList = userTrack.withColumn("position", dense_rank().over(window))\
       ...\
       .take(20)


for val in userTrackList:
   print("%s %s %s" % val)


userTrack.count() # 210675
userTrack.printSchema()
#root
# |-- userId: long (nullable = true)
# |-- trackId: long (nullable = true)
# |-- norm_count: double (nullable = true)

### Building weighted links between users and artists
* Build a userArtist DataFrame from data to store arcs between users and artists. For each user (userId) we add an arc to an artist (artistId) with a weight equal to the total number of times the user has listened to songs by this artist. Use the compute_weight function to keep for each user at most 100 artists with the highest weight and normalize the weights of the arcs kept.   
    
* Display the result: keep only the arcs with the 5 highest possible weight values (use the dense_rank() function and the over(window) window).
   Display 20 rows of the result, sorting in descending order of weights, then in ascending order of userId and artistId.

In [None]:
# weight=number of times a user has listened to tracks by this artist
# group data by userId and artistId
userArtist = ...

#calculate final weight using compute_weight
userArtist = ....persist()

window = Window.partitionBy("userId").orderBy(col("norm_count").desc())

userArtistList = userArtist.withColumn("position", dense_rank().over(window))\
                ...\
                .take(20)

for val in userArtistList:
   print("%s %s %s %s" % val)

userArtist.count() #178419
userArtist.printSchema()
#root
# |-- userId: long (nullable = true)
# |-- artistId: long (nullable = true)
# |-- norm_count: double (nullable = true)

### Building weighted links between artists and songs
* Build an artistTrack DataFrame from data to store arcs between artists and songs. For each artist (artistId), add an arc to a song (trackId) with a weight equal to the total number of times that artist's song has been listened to by all users. Use the compute_weight function to keep for each artist at most 100 songs with the highest weight and normalize the weights of the arcs kept.   
    
* Display the result: keep only the arcs with the 5 highest possible weight values (use the dense_rank() function and the over(window) window).
   Display 20 rows of the result, sorting in descending order of weights, then in ascending order of artistId and trackId.

In [None]:
# arc weight: number of times an artist's track has been listened to by all users

artistTrack = ...

artistTrack = ...persist()

window = Window.partitionBy("artistId").orderBy(col("norm_count").desc())


artistTrackList = artistTrack.withColumn("position", dense_rank().over(window))\
       ...\
       .take(20)

for val in artistTrackList:
   print("%s %s %s" % val)

artistTrack.count() # 35408
artistTrack.printSchema()
#root
# |-- artistId: long (nullable = true)
# |-- trackId: long (nullable = true)
# |-- norm_count: double (nullable = true)

### Building weighted links between songs
* Build a trackTrack DataFrame from data to store arcs between songs. An arc exists between trackId1 and trackId2 if at least one user has listened to both songs. The total weight of an arc between trackId1 and trackId2 is the total number of users who have listened to both trackId1 and trackId2 within 10 minutes (note that the graph is undirected, trackTrack contains both an entry for (trackId1, trackId2) and an entry for (trackId2, trackId1)).   
Use the compute_weight function to keep for each song at most 100 songs with the highest weight and normalize the weights kept.   
    
* Display the result: keep only the arcs with the 5 highest possible weight values (use the dense_rank() function and the over(window) window).
   Display 20 rows of the result, sorting in descending order of weights, then in ascending order of artistId and trackId.

In [None]:
from datetime import *
from pyspark.sql.functions import abs

# Build trackId pairs listened to by the same user
...


#for each trackId pair, the number of users who listened to them together
...

#calculate final weight using compute_weight
trackTrack = ....persist()

window = Window.partitionBy("trackId").orderBy(col("norm_count").desc())

trackTrackList = trackTrack.withColumn("position", dense_rank().over(window))\
       ...\
       .take(20)

for val in trackTrackList:
   print("%s %s %s" % val)

trackTrack.count() #136257
trackTrack.printSchema()
#root
# |-- trackId: long (nullable = true)
# |-- track1: long (nullable = true)
# |-- norm_count: double (nullable = true)


## Building the final graph

Build a graph DataFrame to store all the nodes and links calculated above. The dataframe will contain a 'source' column (source node identifier), a 'destination' column and a 'weight' column.
The 'source' and 'dest' columns contain both user, song and artist IDs. The 'weight' column contains the arc weights calculated from the previous weights, multiplied by the following coefficients:
* Links user->artist: 0.5
* Links user->track: 0.5
* Links track->track: 1
* Links artist->track: 1

In [None]:
...

graph = ....persist()
graph.count() #560759
graph.printSchema()
#root
# |-- source: long (nullable = true)
# |-- dest: long (nullable = true)
# |-- poids: double (nullable = true)

### Computing song recommendations with PPR

Using the Personalized PageRank calculation, recommend to the user with ID 10 the songs he hasn't listened to. Recall the formula for updating the recommendation score at each iteration of the calculation:

x[i] = (1-d) * v[i] + d* sum(xant[j]*weight[j][i])

    - weight[j][i] : weight of arc between j and i
    - v[i]: personalization value, v[10]=1 and v[i]=0 if i !=10
    - xant[j] : score value of node j at previous iteration (x0[10]=1-d and x0[i]=0 if i !=10)

We consider d=0.85 and perform the calculation for 5 iterations (maxiter=5).

### Computation of the recommendation vector x

In [None]:
import pandas as pd
from pyspark.sql.functions import when

user = 10
d=0.85
maxiter = 5
# Build the initial importance vector
x0  = spark.createDataFrame(pd.DataFrame([(user,1)], columns=["id","rank"]))

print("Initial importance")
x0.show()

x = x0
for iter in range(maxiter) :

    nextx = ...

    x = ...


x = x.persist()
print("Final importance")
x.orderBy(col("rank").desc()).show()



```
# Result:

Initial importance
+---+----+
| id|rank|
+---+----+
| 10|   1|
+---+----+

Final importance
+------+--------------------+
|    id|                rank|
+------+--------------------+
|    10| 0.15000000000000002|
|839649| 0.04804175618196067|
|828318| 0.04146198925647322|
|960353|0.038525490989983785|
|960214|0.037971415875705114|
|955486|  0.0344843898841708|
|855194| 0.02795106930767553|
|958924|0.025842628563049787|
|801772|0.025352473837919276|
|807650|0.021943036028298665|
|823737|0.021943036028298665|
|901153| 0.02054042805530674|
|984123|           0.0159375|
|972772|           0.0159375|
|949111|0.014455449194238305|
|955858|0.014255786458993664|
|849768|0.009970758893824672|
|824440| 0.00921446638329685|
|969620|          0.00796875|
|941064|       0.00737109375|
+------+--------------------+
only showing top 20 rows
```

### Building a list of recommended songs that the user hasn't listened to.
Display the 10 most recommended songs that the user hasn't listened to, with their score
previously calculated

In [None]:

...
tracksrec = ...


window = Window.orderBy(col("rank").desc())


tracksreckList = tracksrec.withColumn("position", dense_rank().over(window))\
       ....take(10)

for val in tracksreckList:
   print("%s %s %s %s" % val)

```
# Result:

949111 0.01445053790893624 Hah Heh Hah Artist: Vaya Con Dios
955858 0.014255786458993664 Legz Artist: Jaffa
849768 0.00997048013738679 martin Artist: Dani Martin
803682 0.005324306187499334 Every Morning Artist: Sugar Ray
926933 0.004131316860041196 Who You Want Artist: Qulinez
834413 0.004061852931960852 Bluebells Artist: Patrick Wolf
855373 0.004002950559461102 Marat Artist: Eminem
816283 0.003639860297309027 Requiem Pour Un Fou Artist: Johnny Hallyday
964744 0.0032711386413310814 Perdono Artist: Tiziano Ferro
893155 0.0030469417163617577 Quit   ft Ariana Grande Artist: Cashmere Cat[texte du lien](https://)
```

## Computing triangles

Implement the various steps of the improved algorithm for calculating triangles
presented in class on the trackTrack graph constructed earlier.

In [None]:
# Define a function that takes as argument a list of users sorted in ascending order (users) and returns a list of ordered pairs of users
# Attention: chaque couple [a,b] est représenté par une chaîne de caractères "[a,b]"
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def parse_string(users):
    ...
    return results

parse_string_udf = udf(parse_string, ArrayType(StringType()))

In [None]:
# Implementing the triangle computation algorithm

from pyspark.sql.functions import collect_list, sort_array, explode

#Map1 - see slides
#Consider only ordered pairs (trackId, track1) (trackId < track1)
trackOrd = ...

#Reduce 1 - slides: Build for each trackId the list of ordered pairs of its neighbors
# a) group lines by trackId by building a list of neighbors (>trackId) sorted in ascending order (use sort_array)
neighbors = ...


# b) use the function defined above to return the list of possible pairs of neighbors
couples=...

# Map2 + Reduce 2 - see slides
# consider only those rows where the neighbor pairs previously constructed also exist in the graph (join)
from pyspark.sql.functions import concat, lit, count, desc
liste = ...

# Compute the number of triangles for each user and sort the result by decreasing number of triangles.
triangles = ...

triangles.count() #7156

triangles.orderBy(col("nb_triangles").desc()).show()

```
# Result:

+-------+------------+
|trackid|nb_triangles|
+-------+------------+
| 800288|        1161|
| 808082|        1068|
| 805688|         925|
| 806854|         917|
| 815388|         875|
| 825174|         854|
| 831005|         762|
| 805959|         656|
| 798800|         650|
| 798517|         636|
| 799541|         625|
| 801571|         595|
| 846624|         592|
| 841340|         585|
| 810775|         574|
| 844296|         566|
| 802599|         565|
| 811513|         554|
| 858904|         549|
| 813969|         524|
+-------+------------+
only showing top 20 rows
```

## Single Source Shortest Paths

Implement the parallel algorithm for calculating the shortest path from a fixed origin to any destination

In [None]:
from pyspark.sql import functions as F
import pandas as pd

from graphframes import GraphFrame
from graphframes.lib import AggregateMessages as AM

origin=10

#Gather all nodes in a single DF
vertices = ...
# Vector of distances of all nodes from origin
distances = vertices.withColumn("distance", F.when(vertices["id"] == origin, 0).otherwise(float("inf")))
active  = spark.createDataFrame(pd.DataFrame([(origin,0)], columns=["idA","distanceA"]))

distances= AM.getCachedDataFrame(distances)

i=0
while active.first():
  ...
  distances = ...

  active=AM.getCachedDataFrame(...)
  distances=AM.getCachedDataFrame(...)


print("Final distances:")
distances.filter(col("distance")!=float('inf')).orderBy(col("distance")).show()

```
# Result:

Final distances:
+------+--------+
|    id|distance|
+------+--------+
|    10|     0.0|
|839649| 0.03125|
|807650| 0.03125|
|823737| 0.03125|
|824440| 0.03125|
|828318| 0.03125|
|901153| 0.03125|
|801772| 0.03125|
|855194| 0.03125|
|934050| 0.03125|
|941064| 0.03125|
|943645| 0.03125|
|955486| 0.03125|
|956604| 0.03125|
|958924| 0.03125|
|960214| 0.03125|
|960353| 0.03125|
|970381| 0.03125|
|976111| 0.03125|
|983989| 0.03125|
+------+--------+
only showing top 20 rows
```