CLEAN_DATA_URL = EVENT_DATA_URL.replace("/sparkify/", "/sparkify/output/02-cleaned-")
# \[02\] Data Introspection

## Setup Spark Session

for a detailed description what is done here see [01-setup-spark-session.ipynb](01-setup-spark-session.ipynb)

In [1]:
# EVENT_DATA_URL = "s3a://udacity/sparkify/sparkify_event_data.json"
EVENT_DATA_URL = "s3a://udacity-dsnd/sparkify/mini_sparkify_event_data.json"

CLEAN_DATA_URL = EVENT_DATA_URL.replace("/sparkify/", "/sparkify/output/02-cleaned-")

EXECUTOR_INSTANCES = 2
EXECUTOR_MEM = '6g'

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from cryptography.fernet import Fernet
import base64
import socket

!./install-s3-jars.sh

def decrypt(encrypted_text):
    """
    decrypts an encrypted text. The seed (master-password) for decryption is read from the file ".seed.txt"
    
    Input: encrypted_text
    
    Output: the decrypted text. If the text was not encrypted with the same seed, 
            an exception is raised.
    """
    with open('.seed.txt') as f:
        seed = f.read().strip()
    return Fernet(base64.b64encode((seed*32)[:32].encode('ascii')).decode('ascii')).decrypt(encrypted_text.encode('ascii')).decode('ascii')

AWS_ACCESS_KEY_ID='V6ge1JcQpvyYGJjb'
AWS_SECRET_ACCESS_KEY = decrypt('gAAAAABkDFI6865LaVJVgtTYo0aMx9-JTPbTo6cwOUjg5eNNPsZhBDoHbRZ8xuXQT0ImNfvqcecZuoJd1VzYQEpBaxyCnKvosii8O1KeqoL2NwKdKtL_AUfT4eW4dvJVP--VjEvc0gB4')
OWN_IP=socket.gethostbyname(socket.gethostname())
APP_NAME = "Sparkify"
SPARK_MASTER = "spark://bit-spark-master-svc.spark.svc.cluster.local:7077"
S3_HOST = "minio-api-service.minio.svc"

print(f'### SETUP SPARK SESSION "{APP_NAME}"')
spark = SparkSession.builder \
    .master(SPARK_MASTER) \
    .config("spark.jars","/home/jovyan/jars/aws-java-sdk-bundle-1.11.1026.jar,/home/jovyan/jars/hadoop-aws-3.3.2.jar") \
    .config("spark.driver.host", OWN_IP) \
    .config("spark.hadoop.fs.s3a.endpoint", S3_HOST) \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) \
    .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.executor.instances", EXECUTOR_INSTANCES) \
    .config("spark.executor.memory", EXECUTOR_MEM) \
    .appName(APP_NAME).getOrCreate()
print(f"Spark version: {spark.version}")
sc = spark.sparkContext
sc.setLogLevel("WARN")

print(f"### LOAD DATA {EVENT_DATA_URL}")
df = spark.read.json(EVENT_DATA_URL)
print(f"### PERSIST df")
df_persist = df.persist()
df = df_persist

### SETUP SPARK SESSION "Sparkify"
Spark version: 3.3.2
### LOAD DATA s3a://udacity-dsnd/sparkify/mini_sparkify_event_data.json
### PERSIST df


In [2]:
df = df_persist

## Import necessary packages

In [3]:
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.types import IntegerType

## First look


In [4]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [5]:
df.show(5)

+----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|          artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|  Martha Tilston|Logged In|    Colin|     M|           50| Freeman|277.89016| paid|     Bakersfield, CA|   PUT|NextSong|1538173362000|       29|           Rockpools|   200|1538352117000|Mozilla/5.0 (Wind...|    30|
|Five Iron Frenzy|Logged In|    Micah|     M|           79|    Long|236.09424| free|Boston-Cambridge-...|   PUT|NextSong|1538331630000| 

In [6]:
df.describe().show()

+-------+------------------+----------+---------+------+------------------+--------+------------------+------+-----------------+------+-------+--------------------+-----------------+--------------------+------------------+--------------------+--------------------+------------------+
|summary|            artist|      auth|firstName|gender|     itemInSession|lastName|            length| level|         location|method|   page|        registration|        sessionId|                song|            status|                  ts|           userAgent|            userId|
+-------+------------------+----------+---------+------+------------------+--------+------------------+------+-----------------+------+-------+--------------------+-----------------+--------------------+------------------+--------------------+--------------------+------------------+
|  count|            228108|    286500|   278154|278154|            286500|  278154|            228108|286500|           278154|286500| 286500|     

Looks like there are some encoding issues in `artist` and `song`  
Drop text columns and describe again:

In [7]:
df.drop("artist", "song", "userAgent", "firstName", "lastName", "location", "gender", "method", "level", "page", "auth").describe().show()

+-------+------------------+------------------+--------------------+-----------------+------------------+--------------------+------------------+
|summary|     itemInSession|            length|        registration|        sessionId|            status|                  ts|            userId|
+-------+------------------+------------------+--------------------+-----------------+------------------+--------------------+------------------+
|  count|            286500|            228108|              278154|           286500|            286500|              286500|            286500|
|   mean|114.41421291448516|249.11718197783375|1.535358834085557E12|1041.526554973822|210.05459685863875|1.540956889810471...| 59682.02278593872|
| stddev|129.76726201141105| 99.23517921058325| 3.291321616327434E9|726.7762634630799| 31.50507848842204| 1.507543960819524E9|109091.94999910535|
|    min|                 0|           0.78322|       1521380675000|                1|               200|       153835211700

In [8]:
df.drop("artist", "song", "userAgent").show()

+---------+---------+------+-------------+--------+---------+-----+--------------------+------+---------------+-------------+---------+------+-------------+------+
|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|           page| registration|sessionId|status|           ts|userId|
+---------+---------+------+-------------+--------+---------+-----+--------------------+------+---------------+-------------+---------+------+-------------+------+
|Logged In|    Colin|     M|           50| Freeman|277.89016| paid|     Bakersfield, CA|   PUT|       NextSong|1538173362000|       29|   200|1538352117000|    30|
|Logged In|    Micah|     M|           79|    Long|236.09424| free|Boston-Cambridge-...|   PUT|       NextSong|1538331630000|        8|   200|1538352180000|     9|
|Logged In|    Colin|     M|           51| Freeman| 282.8273| paid|     Bakersfield, CA|   PUT|       NextSong|1538173362000|       29|   200|1538352394000|    30|
|Logged In|    M

### Get list of possible values for categorical columns

In [9]:
df.groupBy("auth").count().show()

+----------+------+
|      auth| count|
+----------+------+
|Logged Out|  8249|
| Cancelled|    52|
| Logged In|278102|
|     Guest|    97|
+----------+------+



In [10]:
df.groupBy("gender").count().show()

+------+------+
|gender| count|
+------+------+
|     F|154578|
|  null|  8346|
|     M|123576|
+------+------+



In [11]:
df.groupBy("level").count().show()

+-----+------+
|level| count|
+-----+------+
| free| 58338|
| paid|228162|
+-----+------+



In [12]:
df.groupBy("method").count().show()

+------+------+
|method| count|
+------+------+
|   PUT|261064|
|   GET| 25436|
+------+------+



In [13]:
pages = [x.page for x in df.select("page").distinct().collect()]
print(f"different page categories: {len(pages)}")
print(pages)

different page categories: 22
['Cancel', 'Submit Downgrade', 'Thumbs Down', 'Home', 'Downgrade', 'Roll Advert', 'Logout', 'Save Settings', 'Cancellation Confirmation', 'About', 'Settings', 'Login', 'Add to Playlist', 'Add Friend', 'NextSong', 'Thumbs Up', 'Help', 'Upgrade', 'Error', 'Submit Upgrade', 'Submit Registration', 'Register']


In [14]:
df.groupBy("status").count().show()

+------+------+
|status| count|
+------+------+
|   307| 26430|
|   404|   258|
|   200|259812|
+------+------+



### Get counts / ranges

In [15]:
import datetime

ts_first = df.agg(F.min(df.ts).alias("ts_first")).collect()[0].ts_first
ts_last = df.agg(F.max(df.ts).alias("ts_last")).collect()[0].ts_last

print(f"first timestamp: {datetime.datetime.fromtimestamp(ts_first/1000.0)}")
print(f"last timestamp: {datetime.datetime.fromtimestamp(ts_last/1000.0)}")

first timestamp: 2018-10-01 00:01:57
last timestamp: 2018-12-03 01:11:16


In [16]:
print(f'number of users: {df.select("userId").distinct().count()}')

number of users: 226


In [17]:
print(f'number of sessions: {df.select("sessionId").distinct().count()}')

number of sessions: 2354


In [18]:
df.select("ts", "sessionId", "itemInSession", "page").where(F.col("sessionId")==29).sort("ts").show(999)

+-------------+---------+-------------+---------------+
|           ts|sessionId|itemInSession|           page|
+-------------+---------+-------------+---------------+
|1538352117000|       29|           50|       NextSong|
|1538352394000|       29|           51|       NextSong|
|1538352676000|       29|           52|       NextSong|
|1538352899000|       29|           53|       NextSong|
|1538352905000|       29|           54|Add to Playlist|
|1538353084000|       29|           55|       NextSong|
|1538353218000|       29|           56|       NextSong|
|1538353441000|       29|           57|       NextSong|
|1538353687000|       29|           58|       NextSong|
|1538353909000|       29|           59|       NextSong|
|1538354132000|       29|           60|       NextSong|
|1538354365000|       29|           61|       NextSong|
|1538354584000|       29|           62|       NextSong|
|1538354806000|       29|           63|       NextSong|
|1538354945000|       29|           64|       Ne

In [19]:
print(f'number of locations: {df.select("location").distinct().count()}')

number of locations: 115


In [20]:
reg_first = df.agg(F.min(df.registration).alias("reg_first")).collect()[0].reg_first
reg_last = df.agg(F.max(df.registration).alias("reg_last")).collect()[0].reg_last

print(f"first registration: {datetime.datetime.fromtimestamp(reg_first/1000.0)}")
print(f"last registration: {datetime.datetime.fromtimestamp(reg_last/1000.0)}")

first registration: 2018-03-18 13:44:35
last registration: 2018-11-26 15:49:14


### Result of the first look

There are some encoding problems in artist and song.

`ts` is the timestamp, when the event (row) was recorded (in Milliseconds). Data was collected for two months in 2018 from October 1st to December 3rd.

`userId` is a user specific identifier, the dataset contains 226 users

`page` is the action, which triggered the event. There are 22 different page categories, like entering home-page, clicking the Next-Song Button, ...

`sessionId` identifies the Browser-Session of a user. `itemInSession` counts upwards the events in the user session. Has to be analyzed in more detail later, strange is itemInSession "0" is after the other timestamps 50-91.

`userAgent` identifies the browser, which the user uses (Mozilla, ...)

`method` shows whether a formular was sent (POST) or only a page was requested (GET). Most likely is this strongly coupled to the category in `page`

`status`HTTP return code for the request: 200 - "OK", 307 - "Temporary Redirect", 404 - "Page not found"

`song`, `artist` and `length` are related to the currently slected song. It is not very likely, that these information can give hints about a user churning

`auth` shows the session state (logged-in/logged-out/...). There are many null values.

`firstName`, `lastName`, `location`, `gender` are related to the user. `location` has 115 different values for 226 users, so it can not be seen as categorical. `firstName` and `lastName` should not have any relevance for churning, but `gender` might be relevant

`registration` is the timestamp, when the user registered for Sparkify. Registrations start in March, (when the famous new Sparkify service was launched) and also reach into the timeframe of the data set (end of November) 

`level` shows the current contract the user has, "free" or "paid"


# Cleanup

## Irrelevant columns

Based on the above analysis we can remove columns which are not relevant for our analysis

In [21]:
df = df.drop("artist", "auth", "firstName", "lastName", "length", "location", "method", "song", "userAgent")

## Empty User-ID

Remove events recorder for not logged in users (userId ist "")

In [22]:
print(f"events without userId: {df.filter(df.userId == '').count()}")

df = df.filter(df.userId != '')

print(f"number of events {df.count()}")
print(f"number of users {df.select(df.userId).distinct().count()}")

events without userId: 8346
number of events 278154
number of users 225


## Add ID column

The timestamp field `ts` is not unique, so it can not be used to identify an event. 
Add a new columne with name ID sorted in ascending `ts` order.

In [23]:
print(f"duplicate timestamps: {df.select('ts').count() - df.select('ts').distinct().count()}")

w = Window().orderBy("ts")
df = df.withColumn("id", F.row_number().over(w))

print(f"duplicate ids: {df.select('id').count() - df.select('id').distinct().count()}")


duplicate timestamps: 8384
duplicate ids: 0


## Dummy Encode page category

The `page` column contains categorical values.  
To make the different values accessible as seperate features for training the values are dummy encoded using the Spark `pivot()` function.

The automatically generated column names are renamed to have the prefix "pg_" with only lowercase letters and underscore instead of space.

Because of the big number of categories the first column is not dropped.

### Define metadata for each pagename

This metadata is mainly used for visualizations or pretty printing.

In [24]:
page_info = {
 'Error':                     {'id':  5, 'color': 'red'},

 'Home':                      {'id': 10, 'color': 'gray'},
 'Help':                      {'id': 11, 'color': 'gray'},
 'About':                     {'id': 12, 'color': 'gray'},

 'Login':                     {'id': 14, 'color': 'gray'},
 'Logout':                    {'id': 15, 'color': 'gray'},
    
 'Settings':                  {'id': 17, 'color': 'gray'},
 'Save Settings':             {'id': 18, 'color': 'gray'},
 'Add Friend':                {'id': 19, 'color': 'gray'},

 'Thumbs Down':               {'id': 22, 'color': 'gray'},
 'Thumbs Up':                 {'id': 23, 'color': 'gray'},
 'Add to Playlist':           {'id': 24, 'color': 'gray'},
 'NextSong':                  {'id': 25, 'color': 'gray'},

 'Roll Advert':               {'id': 27, 'color': 'gray'},

 'Register':                  {'id': 30, 'color': 'yellow'},
 'Upgrade':                   {'id': 31, 'color': 'yellow'},

 'Submit Registration':       {'id': 34, 'color': 'green'},
 'Submit Upgrade':            {'id': 35, 'color': 'green'},

 'Downgrade':                 {'id': 38, 'color': 'orange'},
 'Cancel':                    {'id': 39, 'color': 'orange'},

 'Submit Downgrade':          {'id': 42, 'color': 'red'},
    
 'Cancellation Confirmation': {'id': 44, 'color': 'red'},

}

Add a normalized name to `page_info` without space and all lowercase, which can be used for the dummy column names.

In [25]:
def norm_colname(name):
    """
    Input: name which can contain spaces with upper and lowercase letters.
    Output: all spaces replaced with an underscore and all letters converted to lowercase
    """
    return name.replace(' ', '_').lower()

## add "colname" to dictionary page_info
for k in page_info.keys():
    page_info[k]["colname"] = "pg_"+norm_colname(k)

### Create "pg_..."  features

Using the `pivot()` function for each page category a seperate column will be created. The column name is prefixed with "pg_" and normalized.  
Therefore the metadata stored in `page_info` is used.

In [26]:
page_features = df.groupBy("id").pivot("page").agg(F.lit(1)).na.fill(0)
page_features = page_features.toDF(*((page_info[col]["colname"]) if col!="id" else "id" for col in page_features.columns))
page_features.show(10)

+---+--------+-------------+------------------+---------+----------------------------+------------+--------+-------+-------+---------+-----------+--------------+----------------+-----------+-------------------+-----------------+--------------+------------+----------+
| id|pg_about|pg_add_friend|pg_add_to_playlist|pg_cancel|pg_cancellation_confirmation|pg_downgrade|pg_error|pg_help|pg_home|pg_logout|pg_nextsong|pg_roll_advert|pg_save_settings|pg_settings|pg_submit_downgrade|pg_submit_upgrade|pg_thumbs_down|pg_thumbs_up|pg_upgrade|
+---+--------+-------------+------------------+---------+----------------------------+------------+--------+-------+-------+---------+-----------+--------------+----------------+-----------+-------------------+-----------------+--------------+------------+----------+
|  1|       0|            0|                 0|        0|                           0|           0|       0|      0|      0|        0|          1|             0|               0|          0|      

### Looking at the numbers

Aggregate the sum  for all page columns to get an impression how often which action was triggered: 

In [27]:
aggs  = {x: "sum" for x in page_features.columns if x != 'id'}
pagesum = page_features.agg(aggs)
pagesum.show()
print(f'number of cancellations: {pagesum.collect()[0].asDict()["sum(pg_cancellation_confirmation)"]}')
print(f'number of downgrades: {pagesum.collect()[0].asDict()["sum(pg_submit_downgrade)"]}')

+-------------+------------------+--------------+-----------------------+-------------------+------------+-----------------+----------------------+------------------------+--------------+-------------+---------------------------------+----------------+----------------+---------------+-----------------+------------+---------------------+-------------------+
|sum(pg_error)|sum(pg_add_friend)|sum(pg_cancel)|sum(pg_add_to_playlist)|sum(pg_roll_advert)|sum(pg_home)|sum(pg_downgrade)|sum(pg_submit_upgrade)|sum(pg_submit_downgrade)|sum(pg_logout)|sum(pg_about)|sum(pg_cancellation_confirmation)|sum(pg_nextsong)|sum(pg_settings)|sum(pg_upgrade)|sum(pg_thumbs_up)|sum(pg_help)|sum(pg_save_settings)|sum(pg_thumbs_down)|
+-------------+------------------+--------------+-----------------------+-------------------+------------+-----------------+----------------------+------------------------+--------------+-------------+---------------------------------+----------------+----------------+-------------

This means, there were 115 churns, 52 cancels and 63 downgrades

### Add page features to df

The newly generated page features are joined to our dataframe using the "id" key.  
We keep the original "page" column, even if it is already vectorized and not needed for training.  
But we will use it for visualization.

In [28]:
df = df.join(page_features, "id")

### Vectorize level

`Level` can have only two values and is never null. So we can vectorize this column.  
The new column `paid` is 1, if level=="paid":

In [29]:
df.groupBy("level").count().show()

+-----+------+
|level| count|
+-----+------+
| paid|222433|
| free| 55721|
+-----+------+



In [30]:
df = df.withColumn("paid", (df.level == 'paid').cast('int'))
df = df.drop("level")

### Vectorize gender

`gender` was not binary. There were 8346 null value.  
We removed 8346 users with userId == "".  
Maybe the null values are now gone:

In [31]:
df.groupBy("gender").count().show()

+------+------+
|gender| count|
+------+------+
|     M|123576|
|     F|154578|
+------+------+



`gender` is now  binary. We can vectorize it in the same way as label.  
The new column `male` is 1, if gender=="male":

In [32]:
df = df.withColumn("male", (df.gender == 'M').cast('int'))
df = df.drop("gender")

### Vectorize status

`status` contains the http code returned (200, 307, 404). The code 200 ("OK") can be dropped.df.groupBy("level").count().show()

In [34]:
df.groupBy("status").count().show()

+------+------+
|status| count|
+------+------+
|   200|254718|
|   307| 23184|
|   404|   252|
+------+------+



In [35]:
status_features = df.groupBy("id").pivot("status").agg(F.lit(1)).na.fill(0)
status_features = status_features.toDF(*(("status_"+col) if col != "id" else "id" for col in status_features.columns)).drop("status_200")
df = df.join(status_features, "id")
df = df.drop("status")

### Sessions

Sessions start, when a users first visits the Sparkify page and ends, when the browser is closed or after some time of inactivity.
We recognized an anomaly, when itemInSession dropped from 91 back to 0.

In [36]:
df.select("ts", "sessionId", "itemInSession", "page", "userId").where(F.col("sessionId")==29).sort("ts").show(999)

+-------------+---------+-------------+---------------+------+
|           ts|sessionId|itemInSession|           page|userId|
+-------------+---------+-------------+---------------+------+
|1538352117000|       29|           50|       NextSong|    30|
|1538352394000|       29|           51|       NextSong|    30|
|1538352676000|       29|           52|       NextSong|    30|
|1538352899000|       29|           53|       NextSong|    30|
|1538352905000|       29|           54|Add to Playlist|    30|
|1538353084000|       29|           55|       NextSong|    30|
|1538353218000|       29|           56|       NextSong|    30|
|1538353441000|       29|           57|       NextSong|    30|
|1538353687000|       29|           58|       NextSong|    30|
|1538353909000|       29|           59|       NextSong|    30|
|1538354132000|       29|           60|       NextSong|    30|
|1538354365000|       29|           61|       NextSong|    30|
|1538354584000|       29|           62|       NextSong|

The anomaly has vanished, most likely, because the empty user was removed.  
That means, the same session id can occurr at different times for different users.  
Let´s verify this:

In [37]:
df.dropDuplicates(["sessionId", "userId"]).groupBy("sessionId").count().where(F.col("count")>1).show(5)

+---------+-----+
|sessionId|count|
+---------+-----+
|      217|    3|
|       53|    4|
|      240|    3|
|      100|    4|
|      187|    4|
+---------+-----+
only showing top 5 rows



sessionId "65" occurrs for four different users. To verify this:

In [38]:
df.where(df.sessionId==65).groupBy("userId").agg(F.min(F.col("ts")), F.count(F.col("userId"))).show()

+------+-------------+-------------+
|userId|      min(ts)|count(userId)|
+------+-------------+-------------+
|    66|1538682135000|           15|
|200021|1538717365000|           10|
|100013|1538841588000|           73|
|300023|1538984097000|          270|
+------+-------------+-------------+



The duplicate session IDs make things complicated (similair as the duplicate "ts" above).  
In the same way, as we created the new "id" column we will create a synthetic "sid" column, which is unique over all users and times.  
Let´s check, whether the session id is unique per user:

In [39]:
df.groupBy("sessionId", "userId", "itemInSession").count().where(F.col("count")>1).show()

+---------+------+-------------+-----+
|sessionId|userId|itemInSession|count|
+---------+------+-------------+-----+
+---------+------+-------------+-----+



So there are no duplicate session ids for the same userId.  
Now we can create the "sid":

In [43]:
df_sess_user = df.select("sessionId", "userId").dropDuplicates()
w = Window().orderBy("sessionId", "userId")
df_sess_user = df_sess_user.withColumn("sid", F.row_number().over(w))
df = df.join(df_sess_user, ["sessionId", "userId"])

In [46]:
df.select("sid").dropDuplicates().count()

3176

So, we have now 3176 unique "sid"s. The number of different "sessionId"s was only 2345

#### Add Session-Start Marker

It might be helpful to have a marker at the beginning of each session.  

In [47]:
df_session_start = df.groupBy("sid").agg(F.min("id").alias("id")).drop("sid").withColumn("session_start", F.lit(1).cast("int"))
df = df.join(df_session_start, "id", how="outer").fillna(0)

# alternative possibility to mark session starts
# w = Window.partitionBy('sessionId').orderBy(F.col('id'))
# df = df.withColumn('sessionstart', F.coalesce((F.lag('sessionId').over(w) != F.col('sessionId')).cast('int'), F.lit(1)))

Now we can drop the old session columns

In [50]:
df = df.drop("sessionId", "itemInSession")

# Finished

The dataset cleanup is finished. the new Schema looks like this:

In [51]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- userId: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- pg_about: integer (nullable = true)
 |-- pg_add_friend: integer (nullable = true)
 |-- pg_add_to_playlist: integer (nullable = true)
 |-- pg_cancel: integer (nullable = true)
 |-- pg_cancellation_confirmation: integer (nullable = true)
 |-- pg_downgrade: integer (nullable = true)
 |-- pg_error: integer (nullable = true)
 |-- pg_help: integer (nullable = true)
 |-- pg_home: integer (nullable = true)
 |-- pg_logout: integer (nullable = true)
 |-- pg_nextsong: integer (nullable = true)
 |-- pg_roll_advert: integer (nullable = true)
 |-- pg_save_settings: integer (nullable = true)
 |-- pg_settings: integer (nullable = true)
 |-- pg_submit_downgrade: integer (nullable = true)
 |-- pg_submit_upgrade: integer (nullable = true)
 |-- pg_thumbs_down: integer (nullable = true)
 |-- pg_thumbs_up: integer (nu

## Persist dataset

now we have finished all transformations, we can persist this state to reduce recalcuation of the transformations.

In [54]:
df_old_persist = df_persist
df_persist = df.persist()
df_old_persist.unpersist()
df = df_persist

## Save to S3

Following the ETL process, the extracted data is now written into a new file, which can be loaded for the next processing step

In [58]:
print(f"### SAVING CLEANED DATA {CLEAN_DATA_URL}")
df.write.format('json').mode('overwrite').save(CLEAN_DATA_URL)

### SAVING CLEANED DATA s3a://udacity-dsnd/sparkify/output/02-cleaned-mini_sparkify_event_data.json


In [59]:
spark.stop()