<h1>Carga de las bases de datos, filtrado y unión de datos en un único archivo.</h1>

<h2>Importación de librerías</h2>

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext

# Carga del paquete databricks:spark-xml para pasar directamente de XML a dataframe
import os
from os import environ
environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.10:0.4.1 pyspark-shell'

import pyspark.sql.functions as funct


<h2>Configuración de Spark</h2>

In [3]:
#conf = SparkConf()\
#        .setAppName("tfm00")\
#        .setMaster("local")

conf = SparkConf()\
        .setAppName("tfm02")\
        .setMaster("spark://192.168.2.132:7077")
    
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

<h2>Carga del fichero XML Master</h2> 

In [22]:
#Definición de los archivos XML 
fullMasterXmlFile = 'Discogs/masters/discogs_20190701_masters.xml'
sampleMasterXmlFile = 'Discogs/masters/sample_masters.xml'

dfMasters = sqlContext.read.format('com.databricks.spark.xml')\
    .option("excludeAttribute", "false")\
    .options(rootTag='masters')\
    .options(rowTag='master')\
    .load(fullMasterXmlFile)
 
#dfMasters.printSchema()

<h3>Selección de los atributos necesarios y procesado de datos</h3>

In [23]:
#Seleccionamos los atributos sin ningún tipo de procesado. 

finaldfMasters = dfMasters.select("main_release","title","artists","data_quality","genres", "styles", "year")\
            .withColumn("master_title", funct.col("title"))\
            .withColumn("master_artists", funct.col("artists.artist.name"))\
            .withColumn("genres", funct.col("genres.genre"))\
            .withColumn("styles", funct.col("styles.style"))\
            .drop("title","artists")\
            .orderBy("main_release")

#finaldfMasters.count()


In [25]:
#Eliminamos posibles filas duplicadas

finaldfMasters = finaldfMasters.drop_duplicates(['main_release'])
#finaldfMasters.count()

<h4>Generación del archivo Masters con los datos que nos interesan. </h4> 

In [26]:
jsonPath = "MastersCleaned"

finaldfMasters\
    .repartition(1)\
    .coalesce(1)\
    .write\
    .format("json")\
    .save(jsonPath)

In [27]:
jsonFileName = jsonPath + ".json"

#Unión de los archivos parciales en un único csv
os.system("cat " + jsonPath + "/p* > " + jsonPath + "/" + jsonFileName)

0

<h2>Carga del fichero XML Releases</h2> 

In [86]:

#Definición de los archivos XML 
fullReleaseXmlFile = 'Discogs/releases/discogs_20190701_releases.xml'
sampleReleaseXmlFile = 'Discogs/releases/sample_releases.xml'

dfReleases = sqlContext.read.format('com.databricks.spark.xml')\
    .option("excludeAttribute", "false")\
    .options(rootTag='releases')\
    .options(rowTag='release')\
    .load(sampleReleaseXmlFile)
 
#dfReleases.printSchema()

<h3>Selección de los atributos necesarios y procesado de datos</h3>

In [89]:
# Seleccionamos los atributos. 
# Se carga en la columna companies todas las compañías participantes en la publicación (distribuidora, productora, diseñador, etc.)
# Se carga en la columna tracklist la duración de las canciones que compone la publicación

finaldfReleases = dfReleases.select("master_id","title","artists","data_quality","formats","country","companies","tracklist")\
            .withColumn("master_id", funct.col("master_id._VALUE"))\
            .withColumn("formats", funct.col("formats.format._name"))\
            .withColumn("artists", funct.col("artists.artist.name"))\
            .withColumn("companies", funct.explode(funct.col("companies.company")))\
            .withColumn("tracklist", funct.explode(funct.col("tracklist.track.duration")))\
            
finaldfReleases = finaldfReleases.where(funct.col("master_id").isNotNull())

#finaldfReleases.count()

In [90]:
# Nos quedamos solo con las distribuidoras, quitamos así otro tipo de compañías que toman parte del proceso de la publicación

finaldfReleases = finaldfReleases\
            .withColumn("companies", 
                funct.when(funct.col("companies.entity_type_name") == "Distributed By", funct.col("companies.name")))\
            .dropna()

#finaldfReleases.count()

In [81]:
# Redondeamos la duración de las canciones de la publicación para poder sumarlas más adelante y calcular el total

finaldfReleases = finaldfReleases.withColumn("tracklist", funct.to_timestamp("tracklist","mm:ss"))

finaldfReleases = finaldfReleases\
                    .withColumn("tracklist_minutes", 
                                funct.when(funct.second(finaldfReleases["tracklist"]) < 30, funct.minute(finaldfReleases["tracklist"]))\
                                   .otherwise(funct.minute(finaldfReleases["tracklist"])+1))\
                    .drop("tracklist")
                    
#finaldfReleases.count()


In [82]:
# Sumamos la duración de las canciones de la publicación y creamos una nueva columna llamada duration

finaldfReleases = finaldfReleases\
    .groupBy("master_id","title", "artists", "formats", "country", "companies")\
    .agg(funct.sum("tracklist_minutes").alias("duration"), funct.count("tracklist_minutes").alias("number_tracks"))\
    
#finaldfReleases.count()

In [83]:
# Nos quedamos con la compañía más importante, eliminando las demás

finaldfReleases = finaldfReleases\
    .drop_duplicates(['master_id'])\
    .orderBy('master_id')

#finaldfReleases.count()

<h4>Generación del archivo Masters con los datos que nos interesan. </h4> 

In [84]:
jsonPath = "ReleasesCleaned"

finaldfReleases\
    .repartition(1)\
    .coalesce(1)\
    .write\
    .format("json")\
    .save(jsonPath)

In [85]:
jsonFileName = jsonPath + ".json"

#Unión de los archivos parciales en un único csv
os.system("cat " + jsonPath + "/p* > " + jsonPath + "/" + jsonFileName)

0

<h1>Unión de las tablas en una única sola</h1>

In [124]:
#Masters Cleaned

mastersCleanedFile = "MastersCleaned/MastersCleaned.json"

dfMastersCleaned = sqlContext.read.format("json")\
                        .load(mastersCleanedFile)

#dfMastersCleaned.show(5)


In [125]:
releasesCleanedFile = "ReleasesCleaned/ReleasesCleaned.json"

dfReleasesCleaned = sqlContext.read.format("json")\
                        .load(releasesCleanedFile)

#dfReleasesCleaned.show(5)

In [126]:
dfMastersReleases = dfMastersCleaned.alias("masters")\
                    .join(dfReleasesCleaned.alias("releases"), dfMastersCleaned["main_release"] == dfReleasesCleaned["master_id"], how='inner')\
                    .drop("main_release")

#dfMastersReleases.show(5)

In [127]:
dfMastersReleases = dfMastersReleases.select("master_id", 
                                           "title", 
                                           "year", 
                                           "formats", 
                                           "duration", 
                                           "number_tracks", 
                                           "artists",  
                                           "country",   
                                           "companies", 
                                           "genres", 
                                           "styles")\
                                    .orderBy("master_id")

<h5>Creamos un fichero uniendo la información en release y en master.</h5>

<p>De esta manera tenemos los datos que se van a utilizar en una única tabla.</p>

In [128]:
jsonPath = "MastersReleasesJoined"

dfMastersReleases\
    .repartition(1)\
    .coalesce(1)\
    .write\
    .format("json")\
    .save(jsonPath)

In [129]:
jsonFileName = jsonPath + ".json"

#Unión de los archivos parciales en un único csv
os.system("cat " + jsonPath + "/p* > " + jsonPath + "/" + jsonFileName)

0