***PROCESAMIENTO PARA OBTENCIÓN DE TABLA ORIGEN***

A continuación se detalla el procedimiento para obtener la tabla origen(datos crudos), la cual se establece a partir de una muestra con 10.000 registros del dataset original. Dicha muestra ya estaba dada como subconjunto de datos con formato JSON en el reto de Spotify.

Detallaremos los pasos necesarios para descargar el archivo, descomprimirlo, procesarlo para obtener la tabla rigen y, finalmente, descargar dicha tabla en formato CSV para trabajar con ella posteriormente.

In [None]:
%config InlineBackend.figure_format = 'retina'

In [None]:
#Se importa la biblioteca drive de Google Colab y se utiliza la función drive.mount('/content/drive') para montar Google Drive en el entorno y poder acceder a archivos y directorios almacenados en Google Drive desde el entorno de Colab.
#Al ejecutar esta línea, se le pedirá que proporcione una autorización para acceder a Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#Se importan las bibliotecas Pathlib y Os para trabajar con rutas de archivos.

from pathlib import Path
import os

#Se utiliza Path(os.path.expanduser("~")) / "Downloads" para obtener la ruta de la carpeta de descargas en Windows.
#En esta ruta se almacena en la variable basepath
basepath = Path(os.path.expanduser("~")) / "Downloads"

#Se construyen las rutas completas a dos archivos específicos, 'spotify_million_playlist_dataset_challenge.zip' y 'spotify_million_playlist_dataset.zip', en la carpeta de descargas.
#Estas rutas completas se almacenan en las variables challenge_dataset_path y full_dataset_path.
challenge_dataset_path = basepath / 'spotify_million_playlist_dataset_challenge.zip'
full_dataset_path = basepath / 'spotify_million_playlist_dataset.zip'

#Se muestra la ruta de la carpeta de descargas (basepath) en la salida para verificar la ubicación de la carpeta en el sistema de archivos.
basepath

PosixPath('/root/Downloads')

In [None]:
#Instalación de Java (OpenJDK 8) en el entorno de ejecución de Colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

#Descargar Apache Spark desde una URL y guardar el archivo en la carpeta de descargas
!wget  https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz -P '{str(basepath)}'

#Instalar la biblioteca FindSpark para simplificar la integración de Apache Spark con e el entorno de Python en Colab
!pip install  findspark



--2023-11-10 04:45:33--  https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400395283 (382M) [application/x-gzip]
Saving to: ‘/root/Downloads/spark-3.5.0-bin-hadoop3.tgz.12’


2023-11-10 04:45:36 (182 MB/s) - ‘/root/Downloads/spark-3.5.0-bin-hadoop3.tgz.12’ saved [400395283/400395283]



In [None]:
#Descomprimimos el conjunto de datos, el cual se encuentra en una carpeta comprimida llamada 'spotify_million_playlist_dataset_challenge.zip'.
#Esta carpeta se encuentra en el link https://drive.google.com/file/d/1VTgm7UdzeTlqfrymG_o_niVIK_EeYFM6/view?usp=drive_link, debe descargarse y subirse a Colab previamente.

!time unzip  '/content/spotify_million_playlist_dataset_challenge.zip'




Archive:  /content/spotify_million_playlist_dataset_challenge.zip
replace md5? [y]es, [n]o, [A]ll, [N]one, [r]ename: 
real	0m3.644s
user	0m0.000s
sys	0m0.002s


In [None]:
#Verificamos ubicación del archivo y contenido de la carpeta descomprimida
!pwd
!ls

/content
challenge_set.json  md5		 spark-3.5.0-bin-hadoop3
check.py	    nohup.out	 spotify_million_playlist_dataset_challenge.zip
drive		    README.md	 verify_submission.py
jprq_output	    sample_data


In [None]:
#Se descarga e instala Apache Spark en el entorno de Colab
!wget -q https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!mv spark-3.5.0-bin-hadoop3 /opt/


mv: cannot move 'spark-3.5.0-bin-hadoop3' to '/opt/spark-3.5.0-bin-hadoop3': Directory not empty


In [None]:
#Se hace una verificación de actualizaciones
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu jammy-security InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Connected to cloud.r-project.org (52.85.151.                                                                                                    Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Waiting for headers] [Connected to ppa.launchpadcontent.net (185.125.190.52)] [Waiting for heade                                                                                                    Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy In

In [None]:
#Configuramos las variables de entorno para Spark y Java
import os
os.environ["SPARK_HOME"] = "/opt/spark-3.5.0-bin-hadoop3"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"


In [None]:
#Se descarga la librería Findspark para inicializar Spark en el entorno de Python
!pip install findspark




In [None]:
#Se inicializa una sesión de Spark
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Colab") \
    .config("spark.driver.memory", "9g") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()


In [None]:
#Configuramos SPARK_HOME con la ruta de instalación de Spark
#Importamos Findspark y creamos la sesión de Spark
import os

import findspark
findspark.init()

from pyspark.sql import SparkSession
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
spark = SparkSession.builder\
         .master("local")\
         .appName("Colab")\
         .config("spark.driver.memory", "9g")\
         .config('spark.ui.port', '4050')\
         .getOrCreate()
spark




In [None]:
#Se instala la biblioteca jprq
!pip install -q jprq

In [None]:
#Se establecen configuraciones para acceso a la interfaz web de Spark UI desde un navegador externo
!nohup bash -c 'script -c "jprq tcp 4050" -f jprq_output' &
!sleep 2

nohup: appending output to 'nohup.out'


In [None]:
!ps -aux | grep jprq

root       73998  0.0  0.0   7372  3472 ?        S    04:46   0:00 /bin/bash -c ps -aux | grep jprq
root       74000  0.0  0.0   6480  2304 ?        S    04:46   0:00 grep jprq


In [None]:
!cat jprq_output

Script started on 2023-11-10 04:46:30+00:00 [<not executed on terminal>]

[1;36m  [0m[1;36m([0m[1;36m_[0m[1;36m)[0m[1;36m_ __  _ __ __ _ [0m
[1;36m  | | [0m[1;36m'_ \| '[0m[1;36m__/ _` |[0m
[1;36m  | | |_[0m[1;36m)[0m[1;36m | | | [0m[1;36m([0m[1;36m_| |[0m
[1;36m _/ | .__/|_|  \__, |[0m
[1;36m|__/|_|           |_|[0m
[1;36m        v2.[0m[1;36m1.0[0m

[1;33mPress Ctrl+C to quit.[0m

Traceback (most recent call last):
  File "/usr/local/bin/jprq", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.10/dist-packages/click/core.py", line 1128, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.10/dist-packages/click/core.py", line 1659, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.10/dist-packages/click/core.py", line 1395, 

In [None]:
#Se instala Pyspark para  procesamiento y análisis de datos utilizando Spark desde Python
!pip install pyspark

import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import pandas as pd



In [None]:
#Se crea el dataset con el la muestra de datos proporcionada por Spotify
dataset_json_path = '/content/challenge_set.json'


In [None]:
num_followers_col_name = "num_holdouts"
# num_followers_col_name = "num_followers"

In [None]:
#Se hace procesamiento de datos con Spark desde el archivo Json con la muestra de los datos
df = spark.read.json(str(dataset_json_path), multiLine=True)

from pyspark.sql import SparkSession


spark = SparkSession.builder\
    .appName("Colab")\
    .getOrCreate()



In [None]:
#Se verifica el esquema del dataset
df.printSchema()
df.count()

root
 |-- date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- name: string (nullable = true)
 |-- playlists: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_holdouts: long (nullable = true)
 |    |    |-- num_samples: long (nullable = true)
 |    |    |-- num_tracks: long (nullable = true)
 |    |    |-- pid: long (nullable = true)
 |    |    |-- tracks: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- album_name: string (nullable = true)
 |    |    |    |    |-- album_uri: string (nullable = true)
 |    |    |    |    |-- artist_name: string (nullable = true)
 |    |    |    |    |-- artist_uri: string (nullable = true)
 |    |    |    |    |-- duration_ms: long (nullable = true)
 |    |    |    |    |-- pos: long (nullable = true)
 |    |    |    |    |-- track_name: string (nullable = true)
 |    |    |    |    

1

In [None]:
#Creamos un dataframe con la información de las playlist
playlists_df = (
    df
        .select(F.explode("playlists").alias('playlist'))
        .select(F.col("playlist.*"))
)
playlists_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- num_holdouts: long (nullable = true)
 |-- num_samples: long (nullable = true)
 |-- num_tracks: long (nullable = true)
 |-- pid: long (nullable = true)
 |-- tracks: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- album_name: string (nullable = true)
 |    |    |-- album_uri: string (nullable = true)
 |    |    |-- artist_name: string (nullable = true)
 |    |    |-- artist_uri: string (nullable = true)
 |    |    |-- duration_ms: long (nullable = true)
 |    |    |-- pos: long (nullable = true)
 |    |    |-- track_name: string (nullable = true)
 |    |    |-- track_uri: string (nullable = true)



In [None]:
#Verificamos el tamaño de la muestra del dataset
playlists_df.count()

10000

In [None]:
#Creamos un dataframe incluyendo sólo las filas donde la columna "tracks" tiene un tamaño mayor que cero.
#Esto asegura que sólo se seleccionen listas de reproducción que contienen pistas.
#Se seleccionan columnas específicas del DataFrame resultante, incluyendo el nombre de la lista de reproducción, el identificador de la lista de reproducción, el número de seguidores de la lista de reproducción
#y todos los datos relacionados con las pistas de la lista de reproducción
playlist_tracks_df = (
    playlists_df
        .where(F.size(F.col("tracks")) > 0) # para
        .select("*", F.explode("tracks").alias('track'))
        .select(F.col("name").alias("playlist_name"), F.col("pid").alias("playlist_pid"), F.col(num_followers_col_name).alias("playlist_num_followers"), F.col("track.*"))


)

#Se imprime el data frame
playlist_tracks_df.printSchema()
playlist_tracks_df.show()

root
 |-- playlist_name: string (nullable = true)
 |-- playlist_pid: long (nullable = true)
 |-- playlist_num_followers: long (nullable = true)
 |-- album_name: string (nullable = true)
 |-- album_uri: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_uri: string (nullable = true)
 |-- duration_ms: long (nullable = true)
 |-- pos: long (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_uri: string (nullable = true)

+-------------+------------+----------------------+--------------------+--------------------+--------------------+--------------------+-----------+---+--------------------+--------------------+
|playlist_name|playlist_pid|playlist_num_followers|          album_name|           album_uri|         artist_name|          artist_uri|duration_ms|pos|          track_name|           track_uri|
+-------------+------------+----------------------+--------------------+--------------------+--------------------+--------------------+-------

In [None]:
#Se guarda la tabla resultante como Tabla Origen en formato CSV
from pyspark.sql import SparkSession

#Creamos una sesión de Spark
spark = SparkSession.builder.appName("GuardarCSV").getOrCreate()

#Reemplazamos "ruta_del_archivo.csv" con la ubicación y nombre del archivo CSV de destino en Google Drive
ruta_csv = "/content/drive/My Drive/dataset_spotify.csv"

#Se escribe el DataFrame en un archivo CSV
playlist_tracks_df.write.csv(ruta_csv, header=True, mode="overwrite")

#Se cierra la sesión de Spark
spark.stop()

