### Functions

In the previous video, we've used a number of functions to manipulate our dataframe. Let's take a look at the different type of functions and their potential pitfalls.
General functions

We have used the following general functions that are quite similar to methods of pandas dataframes:

    select(): returns a new DataFrame with the selected columns
    filter(): filters rows using the given condition
    where(): is just an alias for filter()
    groupBy(): groups the DataFrame using the specified columns, so we can run aggregation on them
    sort(): returns a new DataFrame sorted by the specified column(s). By default the second parameter 'ascending' is True.
    dropDuplicates(): returns a new DataFrame with unique rows based on all or just a subset of columns
    withColumn(): returns a new DataFrame by adding a column or replacing the existing column that has the same name.
        The first parameter is the name of the new column, the second is an expression of how to compute it.
            
### Aggregate functions

Spark SQL provides built-in methods for the most common aggregations such as count(), countDistinct(), avg(), max(), min(), etc. in the pyspark.sql.functions module. These methods are not the same as the built-in methods in the Python Standard Library, where we can find min() for example as well, hence you need to be careful not to use them interchangeably.

In many cases, there are multiple ways to express the same aggregations. For example, if we would like to compute one type of aggregate for one or more columns of the DataFrame we can just simply chain the aggregate method after a groupBy(). If we would like to use different functions on different columns, agg()comes in handy. For example agg({"salary": "avg", "age": "max"}) computes the average salary and maximum age.

### User defined functions (UDF)

In Spark SQL we can define our own functions with the udf method from the pyspark.sql.functions module.
The default type of the returned variable for UDFs is string. 
If we would like to return an other type we need to explicitly do so by using the different types from the pyspark.sql.types module.

### Window functions

A way of combining the values of ranges of rows in a DataFrame.
When defining the window we can choose how to sort and group (with the partitionBy method)
the rows and how wide of a window we'd like to use (described by rangeBetween or rowsBetween). 

# Data Wrangling with Spark

This is the code used in the previous screencast. Run each code cell to understand what the code does and how it works.

These first three cells import libraries, instantiate a SparkSession, and then read in the data set

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [3]:
spark = SparkSession \
    .builder \
    .appName("Wrangling Data") \
    .getOrCreate()

Exception: Java gateway process exited before sending its port number

In [4]:
path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)

NameError: name 'spark' is not defined

# Data Exploration 

The next cells explore the data set.

In [None]:
# take 5 rows from pyspark.sql.dataframe.
user_log.take(5)

In [None]:
type(user_log)

In [None]:
user_log.printSchema()

In [None]:
# show dataframe statistics
user_log.describe().show()

In [None]:
user_log.describe("sessionId").show()

In [None]:
# count the number of rows
user_log.count()

In [None]:
# showing distinct values of pages
user_log.select("page") \
        .dropDuplicates() \
        .sort("page") \
        .show()

In [None]:
# showing a specific  user ID
(user_log.select(["userId", "firstname", "page", "song"])
         .where(user_log.userId == "1046")
         .collect()
)

# Calculating Statistics by Hour

In [None]:
# udf = user defined function
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). hour)

In [None]:
# creating a column "hour" in df userlog with hours
user_log = user_log.withColumn("hour", get_hour(user_log.ts))

In [None]:
user_log.head()

In [None]:
# counting number of nextSong per hour
songs_in_hour = (user_log.filter(user_log.page == "NextSong")
                         .groupby(user_log.hour)
                         .count()
                         .orderBy(user_log.hour.cast("float"))
                )

In [None]:
songs_in_hour.show(24)

In [None]:
# converting to a pandas dataframe
songs_in_hour_pd = songs_in_hour.toPandas()
songs_in_hour_pd.hour = pd.to_numeric(songs_in_hour_pd.hour)

In [None]:
# ploting
plt.bar(songs_in_hour_pd["hour"], songs_in_hour_pd["count"])
plt.xlim(-1, 24);
plt.ylim(0, 1.2 * max(songs_in_hour_pd["count"]))
plt.xlabel("Hour")
plt.ylabel("Songs played");

# Drop Rows with Missing Values

As you'll see, it turns out there are no missing values in the userID or session columns. But there are userID values that are empty strings.

In [None]:
# droping missing values from columns userID, sessionID
# how: "any" or "all". if "any", drop a row if it contains any nulls.
# if "all", drop a row only if all its values are null. if minNonNulls is specified, how is ignored.
user_log_valid = user_log.dropna(how = "any", subset = ["userId", "sessionId"])

In [None]:
user_log.select("userId")\
        .dropDuplicates()\
        .sort("userId")\
        .show()

In [None]:
# removing userID = ""
user_log_valid = user_log_valid.filter(user_log_valid["userId"] != "")

In [None]:
user_log_valid.count()

# Users Downgrade Their Accounts

Find when users downgrade their accounts and then flag those log entries.
Then use a window function and cumulative sum to distinguish each user's data as either pre or post downgrade events.

In [6]:
# searching for an user that downgraded
user_log_valid.filter("page = 'Submit Downgrade'").show()

NameError: name 'user_log_valid' is not defined

In [None]:
(user_log.select(["userId", "firstname", "page", "level", "song"])
    .where(user_log.userId == "1138")
    .collect()
)

In [None]:
# creating a udf to flag the downgrade actions
flag_downgrade_event = udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())

In [None]:
# flaggind downgrade actions in a new column "downgraded"
user_log_valid = user_log_valid.withColumn("downgraded", flag_downgrade_event("page"))

In [None]:
user_log_valid.head()

In [None]:
# importing SQL window function
from pyspark.sql import Window

In [None]:
# creating an object with the parameters of the interest range (window) of dataframe
# this window order DECREASING the timestamp
windowval = (Window.partitionBy("userId")
                .orderBy(desc("ts"))
                .rangeBetween(Window.unboundedPreceding, 0)
            )

In [None]:
# sum column "downgrade" in the defined windows
user_log_valid = user_log_valid.withColumn("phase", Fsum("downgraded").over(windowval))

In [None]:
(user_log_valid.select(["userId", "firstname", "ts", "page", "level", "phase"])
    .where(user_log.userId == "1138")\
    .sort("ts")\
    .collect()
)