In [23]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf, col, sum, avg, count, max, when, desc, asc, lit
from pyspark.sql.types import StringType, DoubleType, FloatType

import os
import numpy as np
import pandas as pd
import tables
import time

In [24]:
#.config("spark.dynamicAllocation.enabled", True)\
#.config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
#.config("spark.shuffle.service.enabled", True)\

#spark = SparkSession.builder \
#    .appName("country_hotness") \
#    .config("spark.master", "yarn") \
#    .config("spark.submit.deployMode", "client") \
#    .config("spark.executor.cores", 1)\
#    .config("spark.cores.max", 1)\
#    .getOrCreate()

spark = SparkSession.builder \
    .appName("country_hotness") \
    .master("spark://192.168.2.110:7077") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.executor.cores", 1)\
    .config("spark.cores.max", 1)\
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)


In [25]:
start_time = time.time()

In [26]:
df = sqlContext.read.csv("hdfs://Group10-master:9000/user/hadoop/millionsongsubsetcountries.csv",
                        header='true', inferSchema='true')

df.show(vertical = True)

                                                                                

-RECORD 0---------------------------------------
 SongNumber              | 1                    
 SongID                  | SOMZWCG12A8C13C480   
 AlbumID                 | 300848               
 AlbumName               | Fear Itself          
 ArtistID                | ARD7TVE1187B99BFB1   
 ArtistLatitude          | null                 
 ArtistLocation          | California - LA      
 ArtistLongitude         | null                 
 ArtistName              | Casual               
 Danceability            | 0.0                  
 Hotttness               | 0.6021199899057548   
 Duration                | 218.93179            
 KeySignature            | 1                    
 KeySignatureConfidence  | 0.736                
 Tempo                   | 92.198               
 TimeSignature           | 4                    
 TimeSignatureConfidence | 0.778                
 Title                   | I Didn't Mean To     
 Year                    | 0                    
 Country            

In [27]:
df_filtered = df.filter("Country is not null").filter("Hotttness > 0")

df_filtered.show(1, vertical = True)
df_filtered.count()

[Stage 3:>                                                          (0 + 1) / 1]                                                                                

-RECORD 0---------------------------------------
 SongNumber              | 16                   
 SongID                  | SOHUOAP12A8AE488E9   
 AlbumID                 | 135122               
 AlbumName               | Outskirts            
 ArtistID                | ARD842G1187B997376   
 ArtistLatitude          | 43.64856             
 ArtistLocation          | Toronto Ontario C... 
 ArtistLongitude         | -79.38533            
 ArtistName              | Blue Rodeo           
 Danceability            | 0.0                  
 Hotttness               | 0.4051157216913865   
 Duration                | 491.12771            
 KeySignature            | 7                    
 KeySignatureConfidence  | 1.0                  
 Tempo                   | 119.826              
 TimeSignature           | 4                    
 TimeSignatureConfidence | 0.756                
 Title                   | Floating             
 Year                    | 1987                 
 Country            

                                                                                

1625

In [28]:


df_grouped = df_filtered.groupBy("Country")

In [29]:
df_top_sum = df_grouped.agg(sum("Hotttness").alias("total_hotness"), count("Country").alias("count"))

df_top_sum.sort(desc("total_hotness")).show()

[Stage 9:>                                                          (0 + 1) / 1]

+--------------------+------------------+-----+
|             Country|     total_hotness|count|
+--------------------+------------------+-----+
|United States of ...| 471.3887721084901| 1045|
|      United Kingdom| 90.78757398757114|  196|
|              Canada| 29.43225908243006|   62|
|              France|13.510679262160474|   31|
|             Germany|11.315300881904232|   27|
|             Jamaica|11.034167781078104|   31|
|              Sweden|10.418327763360482|   21|
|             Ireland| 8.821988482410477|   20|
|           Australia|  8.75111104387985|   21|
|           Argentina| 8.384042948798184|   20|
|              Greece| 6.078788776342014|   11|
|              Brazil| 5.602970735175841|   15|
|             Finland| 5.321946980983597|   11|
|                Cuba| 4.413528550856696|   13|
|         Puerto Rico| 4.131364404985561|    9|
|             Belgium|3.8092884555890523|   10|
|              Norway|3.7758161132743484|    9|
|              Mexico|3.2104779694519707

                                                                                

In [30]:
df_grouped_avg = df_grouped.agg(avg("Hotttness").alias("average_hotness"), count("Country").alias("count"))

df_grouped_avg.sort(desc("average_hotness")).show()

[Stage 10:>                                                         (0 + 1) / 1]

+--------------------+-------------------+-----+
|             Country|    average_hotness|count|
+--------------------+-------------------+-----+
|             Nigeria| 0.6640612814552243|    2|
|            Portugal| 0.6375469504072112|    3|
|                Mali| 0.5917089403475553|    2|
|               Ghana| 0.5763999812554107|    1|
|              Greece| 0.5526171614856377|   11|
|              Latvia| 0.5477982006297231|    2|
|          Tajikistan| 0.5375039041917102|    1|
|              Poland| 0.5366125827687278|    1|
|               Egypt| 0.5239170720288441|    3|
|                Peru| 0.5114243256864599|    1|
|              Sweden|0.49611084587430865|   21|
|           Venezuela| 0.4873785077642102|    1|
|             Finland| 0.4838133619075997|   11|
|              Canada| 0.4747138561682268|   62|
|      United Kingdom|0.46320190809985273|  196|
|      Dominican Rep.|  0.460902589254209|    2|
|         Puerto Rico|0.45904048944284015|    9|
|United States of ..

                                                                                

In [31]:
df_grouped_filtered = df_grouped_avg.filter("`count` >= 5")

df_grouped_filtered.sort(desc("average_hotness")).show()

+--------------------+-------------------+-----+
|             Country|    average_hotness|count|
+--------------------+-------------------+-----+
|              Greece| 0.5526171614856377|   11|
|              Sweden|0.49611084587430865|   21|
|             Finland| 0.4838133619075997|   11|
|              Canada| 0.4747138561682268|   62|
|      United Kingdom|0.46320190809985273|  196|
|         Puerto Rico|0.45904048944284015|    9|
|United States of ...|0.45108973407511016| 1045|
|             Ireland|0.44109942412052383|   20|
|              France| 0.4358283632954992|   31|
|              Norway|0.41953512369714985|    9|
|           Argentina| 0.4192021474399092|   20|
|             Germany| 0.4190852178483049|   27|
|           Australia|0.41671957351808814|   21|
|              Mexico|0.40130974618149634|    8|
|             Belgium| 0.3809288455589052|   10|
|              Brazil|0.37353138234505606|   15|
|             Jamaica| 0.3559408961638098|   31|
|                Cub

In [32]:
df_grouped_avg.sort(asc("average_hotness")).show()

+---------------+-------------------+-----+
|        Country|    average_hotness|count|
+---------------+-------------------+-----+
|    Philippines|0.21204540548371908|    1|
|          India|0.21204540548371908|    1|
|    Switzerland|0.21508031850922793|    3|
|         Panama| 0.2538347361322313|    1|
|        Moldova|0.26695518627553855|    1|
|        Lebanon|0.28638866060836154|    4|
|   South Africa|0.29986538443855504|    3|
|        Czechia| 0.3135621142479342|    1|
|    New Zealand|  0.315573290829234|    2|
|        Denmark|  0.316855271378245|    4|
|          Spain| 0.3374053766790793|    7|
|          Italy| 0.3376411294372094|    9|
|           Cuba| 0.3395021962197458|   13|
|        Jamaica| 0.3559408961638098|   31|
|         Brazil|0.37353138234505606|   15|
|        Belgium| 0.3809288455589052|   10|
|Dem. Rep. Congo| 0.3867024200396644|    1|
|    Netherlands|0.39200877147130697|    1|
|       Colombia| 0.3927974529186975|    3|
|          Japan|0.3944079224327

In [33]:
df_grouped_filtered.sort(asc("average_hotness")).show()

+--------------------+-------------------+-----+
|             Country|    average_hotness|count|
+--------------------+-------------------+-----+
|               Spain| 0.3374053766790793|    7|
|               Italy| 0.3376411294372094|    9|
|                Cuba| 0.3395021962197458|   13|
|             Jamaica| 0.3559408961638098|   31|
|              Brazil|0.37353138234505606|   15|
|             Belgium| 0.3809288455589052|   10|
|              Mexico|0.40130974618149634|    8|
|           Australia|0.41671957351808814|   21|
|             Germany| 0.4190852178483049|   27|
|           Argentina| 0.4192021474399092|   20|
|              Norway|0.41953512369714985|    9|
|              France| 0.4358283632954992|   31|
|             Ireland|0.44109942412052383|   20|
|United States of ...|0.45108973407511016| 1045|
|         Puerto Rico|0.45904048944284015|    9|
|      United Kingdom|0.46320190809985273|  196|
|              Canada| 0.4747138561682268|   62|
|             Finlan

In [34]:
# Check that total count is correct
df_grouped_avg.groupBy().sum().collect()

[Row(sum(average_hotness)=19.199595172814877, sum(count)=1625)]

In [35]:
print("Time taken: ", time.time() - start_time)

Time taken:  28.410476207733154


In [36]:
sc.stop()