In [222]:
from pyspark.sql import SparkSession

"""make an instance of a SparkSession called 'spark'."""
spark = SparkSession.builder.master('local').getOrCreate()


In [223]:
import os
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, FloatType

# custom_schema = StructType([
#     StructField('index', IntegerType()),
#     StructField('artist_popularity', LongType()),
#     StructField('followers', LongType()),
#     StructField('genres', StringType()),
#     StructField('id', StringType()),
#     StructField('name', StringType()),
#     StructField('track_id', StringType()),
#     StructField('track_name_prev', StringType()),
#     StructField('type', StringType())])
# Read the Spotify artists CSV file into a Spark DataFrame
spark_df = (spark.read.format("csv"
                        ).options(header="true"
                        ).schema("index int, artist_popularity int, followers int, genres string, id string, name string, track_id string, track_name_prev string, type string"
                        ).load("data/spotify_artists.csv"))


In [None]:

spark_df.withColumnRenamed('', 'index').columns



In [None]:
"""Profile the Data:"""
# Show a description (summary) of the Spark DataFrame.
spark_df.describe


In [224]:
# Print the schema of the DataFrame.
spark_df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- artist_popularity: integer (nullable = true)
 |-- followers: integer (nullable = true)
 |-- genres: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- track_name_prev: string (nullable = true)
 |-- type: string (nullable = true)



In [None]:
# Select and show just the first 10 values in the 'name' and 'genres' columns.
spark_df.select('name', 'genres').show(8)


In [225]:
from pyspark.sql.functions import regexp_replace
# Where the genre is an empty list, replace it with ['elevator music'].
spark_df = spark_df.withColumn('genres', regexp_replace(
    'genres', r"\[\]", "['elevator music']"))
spark_df.select('name', 'genres').show(8)


+--------------------+--------------------+
|                name|              genres|
+--------------------+--------------------+
|       Juliano Cezar|['sertanejo', 'se...|
|      The Grenadines|  ['elevator music']|
|             Gangway| ['danish pop rock']|
|               FADES|['uk alternative ...|
| Jean-Pierre Guignon|  ['french baroque']|
|              Filhos|  ['elevator music']|
|                Eloq|  ['elevator music']|
|              Fravær|  ['elevator music']|
|       Camille Pépin|  ['elevator music']|
|Pepe Willberg & T...|['classic finnish...|
+--------------------+--------------------+
only showing top 10 rows



In [226]:
from pyspark.sql.functions import col
"""For the columns 'artist_popularity' and 'followers', cast the data type as integers."""
# done in the schema, but ill throw in the code, so you know i know it
spark_df = spark_df.withColumn('artist_popularity', col(
                    'artist_popularity').cast(IntegerType()))
spark_df = spark_df.withColumn('followers', col(
                    'followers').cast(IntegerType()))



In [240]:
# Sort the data in descending order by number of followers.
spark_df.select('*').sort('followers', ascending=False).show(5)


+-----+-----------------+---------+--------------------+--------------------+-------------+--------------------+---------------+------+
|index|artist_popularity|followers|              genres|                  id|         name|            track_id|track_name_prev|  type|
+-----+-----------------+---------+--------------------+--------------------+-------------+--------------------+---------------+------+
|55251|               92| 41561693|   ['pop', 'uk pop']|6eUKZXaKkcviH0Ku9...|   Ed Sheeran|7qiZfU4dY1lWllzX7...|       track_35|artist|
|53392|               98| 34680740|['canadian hip ho...|3TVXtAsR1Inumwj47...|        Drake|116H0KvKr2Zl4RPuV...|       track_71|artist|
|52620|               90| 30560149|['dance pop', 'po...|5pKCCKE2ajJHZ9KAi...|      Rihanna|2Ce5IyMlVRVvN997Z...|       track_38|artist|
|54447|               88| 26824224|['canadian pop', ...|1uNFoZAHBGtllmzzn...|Justin Bieber|3A7qX2QjDlPnazUsR...|        track_2|artist|
|42872|              100| 26309771|['dance pop',

In [251]:
"""process to change the artist popularity to percent and rename the column"""
from pyspark.sql.functions import udf

# user defined function to divide x by 100
pop_contest = udf(lambda x: x/100)
# apply our udf to the column artist popularity
pop_contest_df = spark_df.withColumn('artist_popularity', 
                            pop_contest(spark_df['artist_popularity']))
# Rename the column 'popularity_percent'.
pop_contest_df = spark_df.withColumnRenamed('artist_popularity', 'popularity_percent')
# change datatype of the percent column from a string to a float
pop_contest_df = pop_contest_df.withColumn('popularity_percent', pop_contest_df[
    'popularity_percent'].cast('float'))
# print the least cool kids to chow our function is working
pop_contest_df.select('popularity_percent', 'followers',
                      'name', 'genres').sort('popularity_percent').show(8)
# print schema to see if it changed to float
pop_contest_df.printSchema()

+------------------+---------+--------------------+------+
|popularity_percent|followers|                name|genres|
+------------------+---------+--------------------+------+
|               0.0|        4|         Adah Sharma|    []|
|               0.0|        0|       Kay Kay Menon|    []|
|               0.0|       23|      Woody Woodbury|    []|
|               0.0|        5|              Caelum|    []|
|               0.0|        1|        Chemeca Gant|    []|
|               0.0|        8|           Tobi Tobi|    []|
|               0.0|        4|Amitabh Bachchan,...|    []|
|               0.0|       19|      Shelly Winters|    []|
+------------------+---------+--------------------+------+
only showing top 8 rows

root
 |-- index: integer (nullable = true)
 |-- popularity_percent: float (nullable = true)
 |-- followers: long (nullable = true)
 |-- genres: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- track_id: string (nulla

In [242]:
custom_schema = StructType([
    StructField('index', IntegerType()),
    StructField('artist_popularity', LongType()),
    StructField('followers', LongType()),
    StructField('genres', StringType()),
    StructField('id', StringType()),
    StructField('name', StringType()),
    StructField('track_id', StringType()),
    StructField('track_name_prev', StringType()),
    StructField('type', StringType())])

"""Extract Information"""
spark_df = (spark.read.format("csv"
                              ).options(header="true"
                                        ).schema(custom_schema
                                                 ).load("data/spotify_artists.csv"))
spark_df.printSchema()

# Save as Parquet

# Your /data directory shouldn't be pushed to GitHub, so this file won't show up in the submitted repository.


root
 |-- index: integer (nullable = true)
 |-- artist_popularity: long (nullable = true)
 |-- followers: long (nullable = true)
 |-- genres: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- track_name_prev: string (nullable = true)
 |-- type: string (nullable = true)



In [243]:
# Show only the values in the DataFrame that have 'Queen' in the name.
spark_df.select('*').filter(spark_df.name.contains('Queen')).show(8)

+-----+-----------------+---------+--------------------+--------------------+--------------------+--------------------+---------------+------+
|index|artist_popularity|followers|              genres|                  id|                name|            track_id|track_name_prev|  type|
+-----+-----------------+---------+--------------------+--------------------+--------------------+--------------------+---------------+------+
|   40|               16|      695|                  []|4SK9OzAA0K00NVsXA...|       Queen Machine|6u3RWvO7ZIIdVci1N...|       track_56|artist|
|  901|               43|    18224|           ['strut']|71WL5bNm5jPPpwpDc...|  Bob the Drag Queen|5IsdA6g8IFKGmC1xl...|        track_8|artist|
| 1518|               30|     2297|           ['benga']|2FzYw9fn2ZtQ7sZma...|Muthoni Drummer Q...|4F0e4hx3bASeaqLqS...|       track_45|artist|
| 2152|               22|     3244|['afropop', 'kwai...|5LFWp4p0pMURif2d7...|Mahlathini & The ...|6WbcheHRcJNMaDIkO...|       track_15|artist|

In [247]:
spark_df = spark_df.withColumn('popularity_percent', spark_df[
    'popularity_percent'].cast('float'))
spark_df.printSchema()


AnalysisException: Cannot resolve column name "popularity_percent" among (index, artist_popularity, followers, genres, id, name, track_id, track_name_prev, type)

In [248]:
# Group the data by artist popularity, and show the count for each group.
spark_df = spark_df.withColumn('artist_popularity', spark_df[
    'artist_popularity'].cast('float'))
spark_df.printSchema()

spark_df.groupBy('artist_popularity').sum('artist_popularity').show(8)

root
 |-- index: integer (nullable = true)
 |-- artist_popularity: float (nullable = true)
 |-- followers: long (nullable = true)
 |-- genres: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- track_name_prev: string (nullable = true)
 |-- type: string (nullable = true)

+-----------------+----------------------+
|artist_popularity|sum(artist_popularity)|
+-----------------+----------------------+
|             18.0|               16020.0|
|             64.0|               20928.0|
|             82.0|                1886.0|
|             47.0|               46765.0|
|              9.0|                5292.0|
|             58.0|               27550.0|
|             39.0|               47034.0|
|              5.0|                2285.0|
+-----------------+----------------------+
only showing top 8 rows



In [232]:
# Save as Parquet
spark_df.write.parquet('data/spotify.parquet')
# Lastly, write the code to save the DataFrame as a Parquet file in the /data directory.


                                                                                