# Data Wrangling with DataFrames

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, desc
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.window import Window

In [2]:
# Instantiates a Spark session 
spark = SparkSession \
        .builder \
        .appName("Data Wrangling with DataFrames") \
        .getOrCreate()

In [3]:
dataPath = "data/sparkify_log_small.json"
data = spark.read.json(dataPath)

# Dataset 

In [4]:
data.printSchema()
print("# Records: {} ".format(data.count()))

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

# Records: 10000 


In [5]:
data.head()

Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046')

# Question 1

Which page did user id "" (empty string) NOT visit?

In [6]:
data.describe("userId").show()
recordsForUserWithNoId = data.filter(data.userId == "")
print("# records for user with no id: {}".format(recordsForUserWithNoId.count()))

+-------+------------------+
|summary|            userId|
+-------+------------------+
|  count|             10000|
|   mean|1442.4413286423842|
| stddev| 829.8909432082621|
|    min|                  |
|    max|               999|
+-------+------------------+

# records for user with no id: 336


In [7]:
dataForUserWithNoId = data.select(["artist", "page"]).where(data.userId == "")
dataForUserWithNoId.show()

+------+-----+
|artist| page|
+------+-----+
|  null|Login|
|  null| Home|
|  null| Home|
|  null| Home|
|  null|Login|
|  null|Login|
|  null| Home|
|  null| Home|
|  null|Login|
|  null| Home|
|  null|Login|
|  null| Home|
|  null| Help|
|  null| Home|
|  null|Login|
|  null| Home|
|  null|Login|
|  null| Home|
|  null|Login|
|  null| Home|
+------+-----+
only showing top 20 rows



In [8]:
allPages = data.select("page").dropDuplicates().collect()
pagesVisitedByUser = dataForUserWithNoId.select("page").dropDuplicates().collect()

In [9]:
print("Available pages: {}\n".format(allPages))
print("Pages visited by user: {}\n".format(pagesVisitedByUser))

print("Pages not visited by user:")
for row in set(allPages) - set(pagesVisitedByUser):
    print(row.page)

Available pages: [Row(page='Submit Downgrade'), Row(page='Home'), Row(page='Downgrade'), Row(page='Logout'), Row(page='Save Settings'), Row(page='About'), Row(page='Settings'), Row(page='Login'), Row(page='NextSong'), Row(page='Help'), Row(page='Upgrade'), Row(page='Error'), Row(page='Submit Upgrade')]

Pages visited by user: [Row(page='Home'), Row(page='About'), Row(page='Login'), Row(page='Help')]

Pages not visited by user:
Submit Downgrade
Downgrade
Submit Upgrade
Error
Settings
Logout
NextSong
Upgrade
Save Settings


# Question 2

How many female users do we have in the data set?

In [10]:
numberOfFemaleUsers = data.select(["userId"]).filter(data.gender == "F").dropDuplicates().count()
print("# of female user in the dataset: {}".format(numberOfFemaleUsers))

# of female user in the dataset: 462


# Question 3

How many songs were played from the most played artist?

In [11]:
mostPlayedArtist = data.dropna(how = "any", subset = ["userId", "sessionId", "artist"]) \
                       .groupBy(data.artist) \
                       .count() \
                       .withColumnRenamed("count", "Times Played") \
                       .orderBy(data.artist) \
                       .sort("Times Played", ascending=False)

In [12]:
mostPlayedArtist.show()

+--------------------+------------+
|              artist|Times Played|
+--------------------+------------+
|            Coldplay|          83|
|       Kings Of Leon|          69|
|Florence + The Ma...|          52|
|            BjÃÂ¶rk|          46|
|       Dwight Yoakam|          45|
|       Justin Bieber|          43|
|      The Black Keys|          40|
|         OneRepublic|          37|
|        Jack Johnson|          36|
|                Muse|          36|
|           Radiohead|          31|
|        Taylor Swift|          29|
|               Train|          28|
|Barry Tuckwell/Ac...|          28|
|          Lily Allen|          28|
|          Nickelback|          27|
|           Daft Punk|          27|
|           Metallica|          27|
|          Kanye West|          26|
|Red Hot Chili Pep...|          24|
+--------------------+------------+
only showing top 20 rows



In [13]:
print("Most played artist:")
mostPlayedArtist.show(1)

Most played artist:
+--------+------------+
|  artist|Times Played|
+--------+------------+
|Coldplay|          83|
+--------+------------+
only showing top 1 row



# Question 4

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.



In [14]:
hasVisitedHomePage = udf(lambda visitedHomePage : int(visitedHomePage == "Home"), IntegerType())

In [15]:
user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

In [16]:
usersVisitedHomePage = data.filter((data.page == "NextSong") | (data.page == "Home")) \
                           .select(["userId", "page", "song", "ts"]) \
                           .withColumn("visitedHomePage", hasVisitedHomePage(col("page"))) \
                           .withColumn('period', Fsum('visitedHomePage').over(user_window))

In [17]:
usersVisitedHomePage.show()

+------+--------+--------------------+-------------+---------------+------+
|userId|    page|                song|           ts|visitedHomePage|period|
+------+--------+--------------------+-------------+---------------+------+
|  1436|NextSong| Throw It In The Bag|1513783259284|              0|     0|
|  1436|NextSong|Atom Bomb (Atomix 2)|1513782858284|              0|     0|
|  2088|    Home|                null|1513805972284|              1|     1|
|  2088|NextSong|          Back To Me|1513805859284|              0|     1|
|  2088|NextSong|Keep On Hoping [F...|1513805494284|              0|     1|
|  2088|NextSong|              Shanti|1513805065284|              0|     1|
|  2088|NextSong|   Rest Of Your Life|1513804786284|              0|     1|
|  2088|NextSong|Inside The Fire (...|1513804555284|              0|     1|
|  2088|NextSong|             Siechen|1513804196284|              0|     1|
|  2088|NextSong|            Spectrum|1513803967284|              0|     1|
|  2088|Next

In [18]:
usersVisitedHomePage.filter((usersVisitedHomePage.page == 'NextSong')) \
                    .groupBy('userID', 'period') \
                    .agg({'period':'count'}) \
                    .agg({'count(period)':'avg'}) \
                    .withColumnRenamed("avg(count(period))", "Average time between a home page visiting") \
                    .show()

+-----------------------------------------+
|Average time between a home page visiting|
+-----------------------------------------+
|                        6.898347107438017|
+-----------------------------------------+

