# Data Wrangling with Spark

This is the code used in the previous screencast. Run each code cell to understand what the code does and how it works.

These first three cells import libraries, instantiate a SparkSession, and then read in the data set

In [44]:
import findspark
findspark.init('C:\spark')

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [34]:
spark = SparkSession \
    .builder \
    .appName("Wrangling Data") \
    .getOrCreate()

In [35]:
spark.sparkContext.getConf().getAll()

[('spark.app.name', 'Wrangling Data'),
 ('spark.app.id', 'local-1607017008900'),
 ('spark.driver.host', 'ssharma-w7.kryptonitesys.com'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '64074')]

In [40]:
path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)
print(type(user_log))

<class 'pyspark.sql.dataframe.DataFrame'>


# Data Exploration 

The next cells explore the data set.

In [5]:
user_log.take(5)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046'),
 Row(artist='Lily Allen', auth='Logged In', firstName='Elizabeth', gender='F', itemInSession=7, lastName='Chase', length=195.23873, level='free', location='Shreveport-Bossier City, LA', method='PUT', page='NextSong', registration=1512718541284, sessionId=5027, song='Cheryl Tweedy', status=200, ts=1513720878284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1000'),
 Row(artist='Cobra Starship Featuring Leighton Meester', auth='Logged In', firstNa

In [6]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [7]:
user_log.describe().show()

+-------+-----------------+----------+---------+------+------------------+--------+-----------------+-----+------------+------+-------+--------------------+------------------+--------+-----------------+-------------------+--------------------+------------------+
|summary|           artist|      auth|firstName|gender|     itemInSession|lastName|           length|level|    location|method|   page|        registration|         sessionId|    song|           status|                 ts|           userAgent|            userId|
+-------+-----------------+----------+---------+------+------------------+--------+-----------------+-----+------------+------+-------+--------------------+------------------+--------+-----------------+-------------------+--------------------+------------------+
|  count|             8347|     10000|     9664|  9664|             10000|    9664|             8347|10000|        9664| 10000|  10000|                9664|             10000|    8347|            10000|         

In [8]:
user_log.describe("artist").show()

+-------+-----------------+
|summary|           artist|
+-------+-----------------+
|  count|             8347|
|   mean|            461.0|
| stddev|            300.0|
|    min|              !!!|
|    max|ÃÂlafur Arnalds|
+-------+-----------------+



In [9]:
user_log.describe("sessionId").show()

+-------+------------------+
|summary|         sessionId|
+-------+------------------+
|  count|             10000|
|   mean|         4436.7511|
| stddev|2043.1281541827561|
|    min|                 9|
|    max|              7144|
+-------+------------------+



In [10]:
user_log.count()

10000

In [11]:
user_log.select("page").dropDuplicates().sort("page").show()

+----------------+
|            page|
+----------------+
|           About|
|       Downgrade|
|           Error|
|            Help|
|            Home|
|           Login|
|          Logout|
|        NextSong|
|   Save Settings|
|        Settings|
|Submit Downgrade|
|  Submit Upgrade|
|         Upgrade|
+----------------+



In [12]:
user_log.select(["userId", "firstname", "page", "song"]).where(user_log.userId == "1046").collect()

[Row(userId='1046', firstname='Kenneth', page='NextSong', song='Christmas Tears Will Fall'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Be Wary Of A Woman'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Public Enemy No.1'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Reign Of The Tyrants'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Father And Son'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='No. 5'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Seventeen'),
 Row(userId='1046', firstname='Kenneth', page='Home', song=None),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='War on war'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Killermont Street'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Black & Blue'),
 Row(userId='1046', firstname='Kenneth', page='Logout', song=None),
 Row(userId='1046', firstname='Kenneth'

# Calculating Statistics by Hour

In [13]:
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). hour)

In [14]:
user_log = user_log.withColumn("hour", get_hour(user_log.ts))

In [15]:
user_log.head()

Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046', hour='14')

In [16]:
songs_in_hour = user_log.filter(user_log.page == "NextSong").groupby(user_log.hour).count().orderBy(user_log.hour.cast("float"))

In [17]:
songs_in_hour.show()

+----+-----+
|hour|count|
+----+-----+
|   0|  375|
|   1|  249|
|   2|  216|
|   3|  228|
|   4|  251|
|   5|  339|
|   6|  462|
|   7|  479|
|   8|  484|
|   9|  430|
|  10|  362|
|  11|  295|
|  12|  257|
|  13|  248|
|  14|  369|
|  15|  375|
|  16|  456|
|  17|  454|
|  18|  382|
|  19|  302|
+----+-----+
only showing top 20 rows



In [None]:
songs_in_hour_pd = songs_in_hour.toPandas()
songs_in_hour_pd.hour = pd.to_numeric(songs_in_hour_pd.hour)

In [None]:
plt.scatter(songs_in_hour_pd["hour"], songs_in_hour_pd["count"])
plt.xlim(-1, 24);
plt.ylim(0, 1.2 * max(songs_in_hour_pd["count"]))
plt.xlabel("Hour")
plt.ylabel("Songs played");

# Drop Rows with Missing Values

As you'll see, it turns out there are no missing values in the userID or session columns. But there are userID values that are empty strings.

In [37]:
user_log_valid = user_log.dropna(how = "any", subset = ["userId", "sessionId"])

In [39]:
user_log_valid.count()

<class 'pyspark.sql.dataframe.DataFrame'>


In [20]:
user_log.select("userId").dropDuplicates().sort("userId").show()

+------+
|userId|
+------+
|      |
|    10|
|   100|
|  1000|
|  1003|
|  1005|
|  1006|
|  1017|
|  1019|
|  1020|
|  1022|
|  1025|
|  1030|
|  1035|
|  1037|
|   104|
|  1040|
|  1042|
|  1043|
|  1046|
+------+
only showing top 20 rows



In [21]:
user_log_valid = user_log_valid.filter(user_log_valid["userId"] != "")

In [22]:
user_log_valid.count()

9664

# Users Downgrade Their Accounts

Find when users downgrade their accounts and then flag those log entries. Then use a window function and cumulative sum to distinguish each user's data as either pre or post downgrade events.

In [None]:
user_log_valid.filter("page = 'Submit Downgrade'").show()

In [None]:
user_log.select(["userId", "firstname", "page", "level", "song"]).where(user_log.userId == "1138").collect()

In [23]:
flag_downgrade_event = udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())

In [24]:
user_log_valid = user_log_valid.withColumn("downgraded", flag_downgrade_event("page"))

In [27]:
user_log_valid.head()

Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046', hour='14', downgraded=0)

In [29]:
from pyspark.sql import Window

In [30]:
windowval = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)

In [31]:
user_log_valid = user_log_valid.withColumn("phase", Fsum("downgraded").over(windowval))

In [32]:
user_log_valid.select(["userId", "firstname", "ts", "page", "level", "phase"]).where(user_log.userId == "1138").sort("ts").collect()

[Row(userId='1138', firstname='Kelly', ts=1513729066284, page='Home', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513729066284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513729313284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513729552284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513729783284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513730001284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513730263284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513730518284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513730768284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513731182284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firs

In [66]:
function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

cusum = user_log.filter((user_log.page == 'NextSong') | (user_log.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function(col('page'))) \
    .withColumn('period', Fsum('homevisit').over(user_window))

In [68]:
cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}).show()

+------+------+-------------+
|userID|period|count(period)|
+------+------+-------------+
|  1436|     0|            2|
|  2088|     1|           13|
|  2162|     0|           19|
|  2162|     2|           15|
|  2294|     0|            4|
|  2294|     1|           17|
|  2294|     2|            3|
|  2294|     3|           16|
|  2294|     4|            4|
|  2294|     5|           11|
|  2904|     0|            1|
|   691|     1|            3|
|  1394|     0|            9|
|  1394|     1|           17|
|  2275|     0|            3|
|  2756|     0|            4|
|  2756|     2|            1|
|   451|     0|            1|
|   451|     1|            1|
|   800|     1|            2|
+------+------+-------------+
only showing top 20 rows



In [67]:
cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}).show()

+------------------+
|avg(count(period))|
+------------------+
| 6.898347107438017|
+------------------+



In [60]:
cusum2=cusum.where(cusum.page=="NextSong").groupBy("userId","period").count().withColumnRenamed("count","songs")

In [62]:
cusum2.show()

+------+------+-----+
|userId|period|songs|
+------+------+-----+
|  1436|     0|    2|
|  2088|     1|   13|
|  2162|     0|   19|
|  2162|     2|   15|
|  2294|     0|    4|
|  2294|     1|   17|
|  2294|     2|    3|
|  2294|     3|   16|
|  2294|     4|    4|
|  2294|     5|   11|
|  2904|     0|    1|
|   691|     1|    3|
|  1394|     0|    9|
|  1394|     1|   17|
|  2275|     0|    3|
|  2756|     0|    4|
|  2756|     2|    1|
|   451|     0|    1|
|   451|     1|    1|
|   800|     1|    2|
+------+------+-----+
only showing top 20 rows



In [65]:
cusum2.groupBy().avg('songs').show()

+-----------------+
|       avg(songs)|
+-----------------+
|6.898347107438017|
+-----------------+



In [69]:
print(type(user_log))

<class 'pyspark.sql.dataframe.DataFrame'>


In [70]:
user_log_pd = user_log.toPandas()

In [71]:
print(type(user_log_pd))

<class 'pandas.core.frame.DataFrame'>


In [72]:
user_log.createOrReplaceTempView("user_log_table")

In [95]:
user_log2 = spark.sql("SELECT artist, iteminsession FROM user_log_table order by userid desc LIMIT 5")

In [93]:
print(type(user_log2))

<class 'pyspark.sql.dataframe.DataFrame'>


In [96]:
print(user_log2.toPandas())

           artist  iteminsession
0        Rapper K              0
1          Enigma              0
2            None              1
3       Radiohead              1
4  Babylon Circus              1


In [97]:
spark.sql("select * from user_log_table").show()

+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession| lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|       Showaddywaddy|Logged In|  Kenneth|     M|          112| Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|
|          Lily Allen|Logged In|Elizabeth|     F|            7|    Chase|195.23873| free|Shreveport-Bossie...|   PUT

In [122]:
is_home = spark.sql('''select userid,page,ts,case when page='Home' then 1 else 0 end home from user_log_table 
where userid !='' and page in('Home','NextSong') 
order by userid, ts desc
''')

In [123]:
is_home.show()

+------+--------+-------------+----+
|userid|    page|           ts|home|
+------+--------+-------------+----+
|    10|NextSong|1513828388284|   1|
|    10|NextSong|1513790894284|   1|
|   100|    Home|1513839673284|  10|
|   100|NextSong|1513776308284|   1|
|   100|    Home|1513776194284|  10|
|   100|NextSong|1513775710284|   1|
|   100|NextSong|1513775556284|   1|
|   100|    Home|1513775431284|  10|
|   100|NextSong|1513750442284|   1|
|   100|NextSong|1513750214284|   1|
|  1000|NextSong|1513720878284|   1|
|  1003|    Home|1513749525284|  10|
|  1003|NextSong|1513749516284|   1|
|  1003|    Home|1513749501284|  10|
|  1005|NextSong|1513782278284|   1|
|  1006|NextSong|1513774019284|   1|
|  1006|NextSong|1513773777284|   1|
|  1006|NextSong|1513773548284|   1|
|  1017|NextSong|1513822643284|   1|
|  1017|    Home|1513822400284|  10|
+------+--------+-------------+----+
only showing top 20 rows



In [124]:
is_home.createOrReplaceTempView("is_home_table")

In [126]:
spark.sql('''
select userid,page,ts,home,
sum(home) over(partition by userid order by ts) sum_ishome 
from is_home_table
order by userid,ts desc
''').show()

+------+--------+-------------+----+----------+
|userid|    page|           ts|home|sum_ishome|
+------+--------+-------------+----+----------+
|    10|NextSong|1513828388284|   1|         1|
|    10|NextSong|1513790894284|   1|         2|
|   100|    Home|1513839673284|  10|        10|
|   100|NextSong|1513776308284|   1|        11|
|   100|    Home|1513776194284|  10|        21|
|   100|NextSong|1513775710284|   1|        22|
|   100|NextSong|1513775556284|   1|        23|
|   100|    Home|1513775431284|  10|        33|
|   100|NextSong|1513750442284|   1|        34|
|   100|NextSong|1513750214284|   1|        35|
|  1000|NextSong|1513720878284|   1|         1|
|  1003|    Home|1513749525284|  10|        10|
|  1003|NextSong|1513749516284|   1|        11|
|  1003|    Home|1513749501284|  10|        21|
|  1005|NextSong|1513782278284|   1|         1|
|  1006|NextSong|1513774019284|   1|         1|
|  1006|NextSong|1513773777284|   1|         2|
|  1006|NextSong|1513773548284|   1|    