# Data Wrangling with DataFrames questions_and_answers


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

In [4]:
spark = SparkSession \
    .builder \
    .appName("Data Frames practice") \
    .getOrCreate()

df = spark.read.json("data/sparkify_log_small.json")

# Question 1

Which page did user id "" (empty string) NOT visit?

In [5]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [11]:
# filter for users with blank userid
blank_pages = df.filter(df.userId == '') \
                .select(col('page') \
                .alias('blank_pages')).dropDuplicates()

In [23]:
blank_pages.collect()

[Row(blank_pages='Home'),
 Row(blank_pages='About'),
 Row(blank_pages='Login'),
 Row(blank_pages='Help')]

In [18]:
# get a list of possible pages that could be visited
all_pages = df.select('page').dropDuplicates()

In [22]:
# find values in all_pages that are not in black_pages
# these would be the pages that the blank user didn't visit
for row in set(all_pages.collect()) - set(blank_pages.collect()):
    print(row.page)

Settings
Save Settings
Logout
Submit Downgrade
Error
Submit Upgrade
NextSong
Downgrade
Upgrade


# Question 2

How many female users do we have in the data set?

In [51]:
df.dropDuplicates(subset=('userid',)).groupBy(['gender']).count().collect()

[Row(gender='F', count=462),
 Row(gender=None, count=1),
 Row(gender='M', count=501)]

In [26]:
# Another way to finding only count for females
df.filter(df.gender == 'F') \
    .select('userId', 'gender') \
    .dropDuplicates() \
    .count()

462

# Question 3

How many songs were played from the most played artist?

In [52]:
df.filter(df.page == 'NextSong') \
    .select('Artist') \
    .groupBy('Artist') \
    .agg({'Artist':'count'}) \
    .withColumnRenamed('count(Artist)', 'Artistcount') \
    .sort(desc('Artistcount')) \
    .show(1)

+--------+-----------+
|  Artist|Artistcount|
+--------+-----------+
|Coldplay|         83|
+--------+-----------+
only showing top 1 row



In [80]:
# Alternative method
df.groupBy(['Artist']).count() \
    .where(col("Artist").isNotNull()) \
    .sort(desc('count')).show(1)

+--------+-----+
|  Artist|count|
+--------+-----+
|Coldplay|   83|
+--------+-----+
only showing top 1 row



# Question 4

How many songs do users listen to on average between visiting the home page?



In [81]:
function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

cusum = df.filter((df.page == 'NextSong') | (df.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function(col('page'))) \
    .withColumn('period', Fsum('homevisit').over(user_window))

cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}).show()

+------------------+
|avg(count(period))|
+------------------+
| 6.898347107438017|
+------------------+

