In [1]:
from pyspark.sql import SparkSession
import pandas as pd

# jupyter notebook full-width display
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

# pandas formatting
pd.set_option('display.float_format', '{:.2f}'.format)
# NOTE: underscore separaters ('_') are better than commas (',') because 
# numbers with underscores work in Python without any extra effort.
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 200)

In [2]:
# spark = SparkSession.builder.appName('PySparkPandas').getOrCreate()
# needs more memory
spark = SparkSession.builder.appName("PySparkPandas").config("spark.driver.memory", "32g").getOrCreate()
# spark.sparkContext.getConf().getAll()

In [3]:
# df_pyspark = spark.read.csv('every_song_with_data.csv', header=True, inferSchema=True)
# df_pyspark.printSchema()

In [4]:
# fix filetypes
from pyspark.sql.types import *

schema = StructType([
    StructField('GARBAGE', IntegerType(), True),
    StructField('id', StringType(), True),
    StructField('song', StringType(), True),
    StructField('artist', StringType(), True),
    StructField('acousticness', FloatType(), True),
    StructField('danceability', FloatType(), True),
    StructField('duration_ms', FloatType(), True),
    StructField('energy', FloatType(), True),
    StructField('instrumentalness', FloatType(), True),
    StructField('key', FloatType(), True),
    StructField('liveness', FloatType(), True),
    StructField('loudness', FloatType(), True),
    StructField('mode', FloatType(), True),
    StructField('speechiness', FloatType(), True),
    StructField('tempo', FloatType(), True),
    StructField('time_signature', FloatType(), True),
    StructField('valence', FloatType(), True)
])

# this is not importing correctly - it is utf-8, not sure why i can't get it to work...
# https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html
df_pyspark = spark.read.csv('every_song_with_data.csv', header=True, schema=schema, encoding='utf-8')
df_pyspark.printSchema()

root
 |-- GARBAGE: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- song: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- acousticness: float (nullable = true)
 |-- danceability: float (nullable = true)
 |-- duration_ms: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- instrumentalness: float (nullable = true)
 |-- key: float (nullable = true)
 |-- liveness: float (nullable = true)
 |-- loudness: float (nullable = true)
 |-- mode: float (nullable = true)
 |-- speechiness: float (nullable = true)
 |-- tempo: float (nullable = true)
 |-- time_signature: float (nullable = true)
 |-- valence: float (nullable = true)



In [5]:
df_pyspark = df_pyspark.drop('GARBAGE')
df_pyspark.printSchema()

root
 |-- id: string (nullable = true)
 |-- song: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- acousticness: float (nullable = true)
 |-- danceability: float (nullable = true)
 |-- duration_ms: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- instrumentalness: float (nullable = true)
 |-- key: float (nullable = true)
 |-- liveness: float (nullable = true)
 |-- loudness: float (nullable = true)
 |-- mode: float (nullable = true)
 |-- speechiness: float (nullable = true)
 |-- tempo: float (nullable = true)
 |-- time_signature: float (nullable = true)
 |-- valence: float (nullable = true)



In [6]:
df_pandas = pd.read_csv('every_song_with_data.csv')
df_pandas = df_pandas[df_pandas.columns[1:]]

In [7]:
df_pandas.dtypes

id                   object
song                 object
artist               object
acousticness        float64
danceability        float64
duration_ms         float64
energy              float64
instrumentalness    float64
key                 float64
liveness            float64
loudness            float64
mode                float64
speechiness         float64
tempo               float64
time_signature      float64
valence             float64
dtype: object

In [8]:
# # this runs out of memory, but when you add more memory, it just doesn't work (.head() = error)
# df_pyspark = spark.createDataFrame(df_pandas, schema=schema)
# df_pyspark.printSchema()

### COMPARISONS

In [9]:
%%time
df_pyspark.count()

Wall time: 3.62 s


9595992

In [10]:
%%time
df_pandas.count()[0]

Wall time: 1.02 s


9595992

In [11]:
%%time
df_pyspark.filter('danceability>0.1').agg({'loudness':'count'}).show()
# df_pyspark.filter('danceability>0.1').agg({'loudness':'mean'}).show()

+---------------+
|count(loudness)|
+---------------+
|        9467923|
+---------------+

Wall time: 5.8 s


In [12]:
%%time
df_pandas[df_pandas.danceability>0.1].loudness.count()
# df_pandas[df_pandas.danceability>0.1].loudness.mean()

Wall time: 1.06 s


9479314

In [13]:
%%time
df_pyspark.filter('danceability>0.9').agg({'loudness':'count'}).show()
# df_pyspark.filter('danceability>0.9').agg({'loudness':'mean'}).show()

+---------------+
|count(loudness)|
+---------------+
|          98089|
+---------------+

Wall time: 4.09 s


In [14]:
%%time
df_pandas[df_pandas.danceability>0.9].loudness.count()
# df_pandas[df_pandas.danceability>0.9].loudness.mean()

Wall time: 73.8 ms


81449

In [15]:
%%time
df_pyspark.describe()

Wall time: 1min 46s


DataFrame[summary: string, id: string, song: string, artist: string, acousticness: string, danceability: string, duration_ms: string, energy: string, instrumentalness: string, key: string, liveness: string, loudness: string, mode: string, speechiness: string, tempo: string, time_signature: string, valence: string]

In [16]:
%%time
df_pyspark.describe().toPandas()
# the data is completely mangled

Wall time: 1min 36s


Unnamed: 0,summary,id,song,artist,acousticness,danceability,duration_ms,energy,instrumentalness,key,liveness,loudness,mode,speechiness,tempo,time_signature,valence
0,count,9595992,9592026,9595606,9562446.0,9584134.0,9591218.0,9593595.0,9594588.0,9595163.0,9595485.0,9595633.0,9595710.0,9595824.0,9595893.0,9595908.0,9595919.0
1,mean,,,,0.4202998270634586,0.5309619621326106,237472.3932000001,550.5764407441259,182.31997574114484,66.29801924915428,25.619303251995973,4.993114946851147,8.860441357223824,3.7017310567163153,120.21270096005216,6.912010068779144,2.570577689189665
2,stddev,,,,0.3790116508812255,0.6646282460134003,159475.43899539037,14986.578948389351,8614.123580993457,4994.874524393464,3110.2100880945936,2602.63596566388,1754.829824496312,1143.9188145955427,929.5529524710596,1069.7799185812169,894.5822740572261
3,min,0000QBRGPosiFRXKmMYnsO,!,"""""""""Appassionata"""""""" - Allegro Assai """"""",0.0,0.0,0.0,0.0,0.0,0.0,0.0,-60.0,-50.955,-53.606,-49.33,-47.873,-35.25
4,max,7zzzxEQAsiuIimY7AiTGJ9,��迌,ｄｅｔｒｏｉｔ７,144.0,1970.0,19672058.0,3610500.0,2180000.0,2263853.0,1857000.0,2079960.0,1392693.0,1590733.0,1248173.0,1089066.0,904893.0


In [17]:
%%time
df_pandas.describe()

Wall time: 3.17 s


Unnamed: 0,acousticness,danceability,duration_ms,energy,instrumentalness,key,liveness,loudness,mode,speechiness,tempo,time_signature,valence
count,9595992.0,9595992.0,9595992.0,9595992.0,9595992.0,9595992.0,9595992.0,9595992.0,9595992.0,9595992.0,9595992.0,9595992.0,9595992.0
mean,0.42,0.53,238209.59,0.54,0.26,5.24,0.21,-10.89,0.66,0.1,118.56,3.84,0.48
std,0.38,0.19,159341.59,0.28,0.37,3.54,0.18,6.36,0.47,0.14,31.03,0.57,0.28
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-60.0,0.0,0.0,0.0,0.0,0.0
25%,0.03,0.4,169600.0,0.31,0.0,2.0,0.1,-13.68,0.0,0.04,95.08,4.0,0.23
50%,0.34,0.55,216933.0,0.57,0.0,5.0,0.13,-9.2,1.0,0.05,118.95,4.0,0.47
75%,0.82,0.68,275080.0,0.79,0.64,8.0,0.26,-6.4,1.0,0.08,137.45,4.0,0.71
max,1.0,1.0,19672058.0,1.0,1.0,11.0,1.0,7.23,1.0,0.97,249.99,5.0,1.0


### CONCLUSIONS:

* Pandas is (way) faster on this computer without any multithreading / distributed processing
* They give different answers for some reason...
    * PySpark isn't importing correctly even though Pandas can, and both default to utf-8 enconding (the enconding for the csv)
    * converting the pandas dataframe to pyspark worked even worse
        * super slow, memory error without changing config, errors 100% of the time (even .head())

### "De"-Bugging
more like bug watching

In [18]:
df_pyspark.filter('danceability>0.99').agg({'loudness':'count'}).show()

+---------------+
|count(loudness)|
+---------------+
|           2779|
+---------------+



In [19]:
df_pandas[df_pandas.danceability>0.99].loudness.count()

91

In [20]:
df_pandas[df_pandas.danceability>=0.99].loudness.count()

91

In [21]:
df_pyspark99 = df_pyspark.filter(df_pyspark.danceability > 0.99)

In [22]:
df_pyspark99.count()

2779

In [23]:
df_pyspark99_pandas = df_pyspark99.toPandas()

In [24]:
list_spark99 = df_pyspark99_pandas.id.tolist()

In [25]:
df_pandas[df_pandas.id.isin(list_spark99)].describe()

Unnamed: 0,acousticness,danceability,duration_ms,energy,instrumentalness,key,liveness,loudness,mode,speechiness,tempo,time_signature,valence
count,2779.0,2779.0,2779.0,2779.0,2779.0,2779.0,2779.0,2779.0,2779.0,2779.0,2779.0,2779.0,2779.0
mean,0.97,0.38,200909.71,0.11,0.58,5.08,0.14,-23.09,0.73,0.06,103.52,3.66,0.29
std,0.15,0.18,170053.31,0.12,0.4,3.33,0.11,6.13,0.44,0.06,31.7,0.82,0.25
min,0.0,0.0,10453.0,0.0,0.0,0.0,0.03,-47.23,0.0,0.0,0.0,0.0,0.0
25%,0.99,0.26,79780.5,0.03,0.04,2.0,0.09,-27.06,0.0,0.04,77.12,3.0,0.08
50%,0.99,0.35,160640.0,0.08,0.85,5.0,0.11,-23.0,1.0,0.05,98.12,4.0,0.21
75%,0.99,0.46,264460.0,0.15,0.91,8.0,0.15,-19.08,1.0,0.06,126.24,4.0,0.42
max,1.0,1.0,1456053.0,0.93,0.99,11.0,0.94,-3.79,1.0,0.91,216.65,5.0,1.0
