# Data Wrangling with DataFrames Coding Quiz

Use this Jupyter notebook to find the answers to the quiz in the previous section. There is an answer key in the next part of the lesson.

In [1]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import udf, desc, asc, sum as Fsum, countDistinct
from pyspark.sql.types import StringType, IntegerType

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
spark = SparkSession.builder.appName('Wrangle Yangle Fangle Mangle Jangle').getOrCreate()
path = 'data/sparkify_log_small.json'
user_log = spark.read.json(path)

In [None]:
user_log.take(5)

In [None]:
user_log.printSchema()

# Question 1

Which page did user id "" (empty string) NOT visit?

In [None]:
user_log.select("page").dropDuplicates().collect()

In [None]:
user_log.select("page").where(user_log.userId == "").dropDuplicates().collect()

# Question 2 - Reflect

What type of user does the empty string user id most likely refer to?


In [None]:
# TODO: use this space to explore the behavior of the user with an empty string
user_log.select(
    "auth", 
    "level", 
    "method", 
    "page", 
    "registration", 
    "status"
    ).where(
    user_log.userId == ""
    ).dropDuplicates().collect()

# Question 3

How many female users do we have in the data set?

In [None]:
user_log.select("userId").where(user_log.gender == "F").dropDuplicates().count()

# Question 4

How many songs were played from the most played artist?

In [None]:
user_log.filter(user_log.page == 'NextSong')\
.select('artist')\
.groupBy('artist')\
.agg({'artist':'count'})\
.withColumnRenamed('count(artist)', 'artistCount')\
.sort(desc('artistCount')).show()

# Question 5 (challenge)

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.



In [None]:
user_log.printSchema()

In [None]:
df = user_log.toPandas()
df.head()

In [None]:
window=Window.partitionBy("userId").orderBy("ts").rangeBetween(Window.unboundedPreceding,0)

In [None]:
from pyspark.sql.functions import count

In [None]:
user_log.filter((user_log.page == "NextSong") | (user_log.page == "Home"))

In [None]:
len(df), len(x)

In [None]:
function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

cusum = df.filter((df.page == 'NextSong') | (df.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function(col('page'))) \
    .withColumn('period', Fsum('homevisit').over(user_window))

cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}).show()